Thread Join() causes Task.RunSynchronously not to finish

196 views Asked by At

Calling _thread.Join() causes the GetConsumingEnumerable loop to be stuck on the last element. Why does this behavior occur?

  public abstract class ActorBase : IDisposable
  {
    private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>(new ConcurrentQueue<Task>());
    private readonly Thread _thread;
    private bool _isDisposed;

    protected ActorBase()
    {
      _thread = new Thread(ProcessMessages);
      _thread.Start();
    }

    protected void QueueTask(Task task)
    {
      if (_isDisposed)
      {
        throw new Exception("Actor was disposed, cannot queue task.");
      }
      _queue.Add(task);
    }

    private void ProcessMessages()
    {
      foreach (var task in _queue.GetConsumingEnumerable())
      {
        task.RunSynchronously();
      }
    }

    public void Dispose()
    {
      _isDisposed = true;
      _queue.CompleteAdding();
      _thread.Join();
    }
  }

  public class SampleActor : ActorBase
  {
    private string GetThreadStatus()
    {
      Thread.Sleep(500);
      return string.Format("Running on thread {0}", Thread.CurrentThread.ManagedThreadId);
    }

    public async Task<string> GetThreadStatusAsync()
    {
      var task = new Task<string>(GetThreadStatus);
      QueueTask(task);
      return await task;
    }
  }

  class Program
  {
    public static async Task Run()
    {
      using (var sa = new SampleActor())
      {
        for (int i = 0; i < 3; i++)
        {
          Console.WriteLine(await sa.GetThreadStatusAsync());
        }
      }
    }

    public static void Main(string[] args)
    {
      Console.WriteLine("Main thread id {0}", Thread.CurrentThread.ManagedThreadId);
      var task = Task.Run(async ()=> { await Run(); });
      task.Wait();
    }
  }

The context for this approach is that I need to make sure that all operations are executed on one OS thread, which would allow a part of the app to use different credentials than the main thread.

2

There are 2 answers

2
i3arnon On BEST ANSWER

async-await works with continuations. To be efficient and reduce scheduling these continuations usually run on the same thread that completed the previous task.

That means in your case that your special thread is not only running the tasks, it's also running all the continuations after these tasks (the for loop itself). You can see that by printing the thread id:

using (var sa = new SampleActor())
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(await sa.GetThreadStatusAsync());
        Console.WriteLine("Continue on thread :" + Thread.CurrentThread.ManagedThreadId);
    }
}

When the for loop completes and the SampleActor is being disposed you call Thread.Join from the same thread your are trying to join so you get a deadlock. Your situation boils down to this:

public static void Main()
{
    Thread thread = null;
    thread = new Thread(() =>
    {
        Thread.Sleep(100);
        thread.Join();
        Console.WriteLine("joined");
    });
    thread.Start();
}

In .Net 4.6 you can solve this with TaskCreationOptions.RunContinuationsAsynchronously but in the current version you can specify the default TaskScheduler:

public Task<string> GetThreadStatusAsync()
{
    var task = new Task<string>(GetThreadStatus);
    QueueTask(task);
    return task.ContinueWith(task1 => task1.GetAwaiter().GetResult(), TaskScheduler.Default);
}
0
noseratio On

It might be tempting to put a simple check to see if the thread you're trying to Join is Thread.CurrentThread, but that would be wrong.

Furthermore, I think the whole approach - scheduling and running cold Task objects with a custom, non-TPL-compliant scheduler - is wrong. You should be using a TPL-friendly task scheduler, similar to Stephen Toub's StaTaskScheduler. Or run a custom SynchronizationContext for your actor-serving thread (like Toub's AsyncPump) and use TaskScheduler.FromCurrentSynchronizationContext and Task.Factory.StartNew to schedue tasks with your custom scheduler (or use Task.Start(TaskScheduler) if you have to deal with cold tasks).

This way, you'll have full control of where tasks and their continuations run, as well as of task inlining.