Run work on specific thread

9.1k views Asked by At

I would like to have one specific thread, queue for Tasks and process tasks in that separate thread. The application would make Tasks based on users usage and queue them into task queue. Then the separate thread processes the tasks. It is vital to keep the thread alive and use it for processing queued tasks even if queue is empty.

I have tried several implementations of TaskScheduler with BlockingCollection and limit the concurrency to only one thread but it seems the Thread gets disposed when queue gets empty and the Task is processed by other thread.

Can you please at least refer me to some sources how to achieve this goal?

tl;dr Trying to limit one specific thread to process tasks which are dynamically added to the queue.

Edit1:

This is experimental web app that uses WCF and .NET framework 4.6. In the WCF library, I am trying to implement this behaviour with one thread processing tasks. This one thread must init prolog using external dll library and then do work with prolog. If other thread is used in process, library throws AccessViolationException. I've done some research and this is most probably because of badly managed threading in that library. I had implementation where I had locks everywhere and it worked. I am now trying to reimplement and make it asynchronous so I don't block the main thread with locking.

I am not at my computer but I provide some code when I get home later today.

1

There are 1 answers

3
Luaan On BEST ANSWER

Your approach seems fine, so you probably just made some tiny stupid mistake.

It's actually pretty easy to make a simple custom TaskScheduler. For your case:

void Main()
{
  var cts = new CancellationTokenSource();
  var myTs = new SingleThreadTaskScheduler(cts.Token);

  myTs.Schedule(() => 
   { Print("Init start"); Thread.Sleep(1000); Print("Init done"); });
  myTs.Schedule(() => Print("Work 1"));   
  myTs.Schedule(() => Print("Work 2"));
  myTs.Schedule(() => Print("Work 3"));
  var lastOne = myTs.Schedule(() => Print("Work 4"));

  Print("Starting TS");
  myTs.Start();

  // Wait for all of them to complete...
  lastOne.GetAwaiter().GetResult();

  Thread.Sleep(1000);

  // And try to schedule another
  myTs.Schedule(() => Print("After emptied")).GetAwaiter().GetResult();

  // And shutdown; it's also pretty useful to have the 
  // TaskScheduler return a "complete task" to await
  myTs.Complete();

  Print("On main thread again");
}

void Print(string str)
{
  Console.WriteLine("{0}: {1}", Thread.CurrentThread.ManagedThreadId, str);
  Thread.Sleep(100);
}

public sealed class SingleThreadTaskScheduler : TaskScheduler
{
  [ThreadStatic]
  private static bool isExecuting;
  private readonly CancellationToken cancellationToken;

  private readonly BlockingCollection<Task> taskQueue;

  public SingleThreadTaskScheduler(CancellationToken cancellationToken)
  {
      this.cancellationToken = cancellationToken;
      this.taskQueue = new BlockingCollection<Task>();
  }

  public void Start()
  {
      new Thread(RunOnCurrentThread) { Name = "STTS Thread" }.Start();
  }

  // Just a helper for the sample code
  public Task Schedule(Action action)
  {
      return 
          Task.Factory.StartNew
              (
                  action, 
                  CancellationToken.None, 
                  TaskCreationOptions.None, 
                  this
              );
  }

  // You can have this public if you want - just make sure to hide it
  private void RunOnCurrentThread()
  {
      isExecuting = true;

      try
      {
          foreach (var task in taskQueue.GetConsumingEnumerable(cancellationToken))
          {
              TryExecuteTask(task);
          }
      }
      catch (OperationCanceledException)
      { }
      finally
      {
          isExecuting = false;
      }
  }

  // Signaling this allows the task scheduler to finish after all tasks complete
  public void Complete() { taskQueue.CompleteAdding(); }   
  protected override IEnumerable<Task> GetScheduledTasks() { return null; }

  protected override void QueueTask(Task task)
  {
      try
      {
          taskQueue.Add(task, cancellationToken);
      }
      catch (OperationCanceledException)
      { }
  }

  protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  {
      // We'd need to remove the task from queue if it was already queued. 
      // That would be too hard.
      if (taskWasPreviouslyQueued) return false;

      return isExecuting && TryExecuteTask(task);
  }
}

It's pretty easy to modify this to give you full control on where the task scheduler is actually executing the task - in fact, I've adapted this from a previous task scheduler I've used which simply had the RunOnCurrentThread method public.

For your case, where you always want to stick to just the one thread, the approach in SingleThreadTaskScheduler is probably better. Although this also has its merits:

// On a new thread
try
{
  InitializeProlog();

  try
  {
    myTs.RunOnCurrentThread();
  }
  finally
  {
    ReleaseProlog();
  }
}
catch (Exception ex)
{
  // The global handler
}