Managing the number of concurrent tasks using the TAP

Tags: async, C#

I got a great question from a customer on controlling the number of active tasks. It’s a general question on async code, so I'll answer that question here.

One particular application path starts a large number of asynchronous tasks using the same service. After starting too many tasks, the service starts rejecting them. This can be a common problem when your making async calls. If you hit the same web service with too many requests, it may start queuing them up rather than responding. Worse, it may erroneously think your client app is part of a DOS attack.

Thankfully, the Task Asynchronous Programming Model (TAP) has an answer. Tasks are objects you can program with, so you can throttle the requests so that only a certain number of tasks are live at any given time. Stephen Toub wrote a great whitepaper (http://download.microsoft.com/download/5/B/9/5B924336-AA5D-4903-95A0-56C6336E32C9/TAP.docx) that describes this and many other combinations that you can with the TAP. Two of them really help with the problem my customer is having.

I wrote a sample program to demonstrate the issue. I didn't want to send a DOS attack to a service, so I simulated that in a client side async task. I wrote a task that produces pithy quotes asynchronously. I'm using Task.Delay and a Random object to change the amount of the delay on each task.

Finally, this service starts throwing exceptions once 5 concurrent tasks are running. That simulates the failures observed when too many tasks are started by the same client.

public class ThrottledService
{     
private static int currentTaskCount = 0;    
private static Random randomizer = new Random((int)DateTime.Now.Ticks);    
private readonly string[] sayings =
    {
         "Hello World",
         "See your future, Be your future",
         "You can't go. All the plants are gonna die",
         "so I got that going for me, which is nice",
         "It's in the hole"
     };
     private static int totalTasks = 0;
     public async Task<string> ProducePithySaying()
     {
         totalTasks++;
         if (currentTaskCount > 4)
             throw new InvalidOperationException("Can only perform 5 concurrent operations");
         System.Threading.Interlocked.Increment(ref currentTaskCount);
         string rVal = string.Format("starting task {0}, {1} now ", totalTasks.ToString(), currentTaskCount.ToString());
         int delay = randomizer.Next(250, 5000);
         await Task.Delay(delay);
         System.Threading.Interlocked.Decrement(ref currentTaskCount);
         rVal += " " + sayings[randomizer.Next(sayings.Length - 1)];
         rVal += " delayed " + delay.ToString();
         return rVal;
     } }

My sample program tries to start and monitor 20 tasks at this service. The Program Entry point can't be an async method. Therefore, the Main method here uses Task.Wait to block until all the tasks have finished. Don''t copy my use of Task.Wait here, except in a method that cannot be async. Notice that I've put all the async work in another method that is marked async, and returns a Task. That way, I can block and wait for the task to complete, but I don't sprinkle sync over async all over the code.

static void Main(string[] args)
{
     var task = DoSampleWork();
     task.Wait(); } static async Task DoSampleWork() {
     var engine = new ThrottledService();
     var allTasks = new List<Task<string>>();
     for (int i = 0; i < 20; i++)
     {
         allTasks.Add(engine.ProducePithySaying());
         await Task.Delay(250);
     }
     foreach (var task in allTasks)
     {
         try
         {
             Console.WriteLine(await task);
         }
         catch (InvalidOperationException)
         {
             Console.WriteLine("This task failed");
         }
    } }

When you run it, you get the following output:

starting task 1, 1 now  Hello World delayed 4289
starting task 2, 2 now  You can't go. All the plants are gonna die delayed 3054
starting task 3, 3 now  Hello World delayed 3803
starting task 4, 4 now  See your future, Be your future delayed 3782
starting task 5, 5 now  Hello World delayed 2720
This task failed
This task failed
This task failed
This task failed
This task failed
This task failed
This task failed
This task failed
starting task 14, 5 now  so I got that going for me, which is nice delayed 2768
This task failed
starting task 16, 5 now  Hello World delayed 4294
This task failed
starting task 18, 4 now  See your future, Be your future delayed 755
starting task 19, 4 now  Hello World delayed 4476
starting task 20, 5 now  so I got that going for me, which is nice delayed 3600

As soon as there are too many tasks, the remaining tasks enter the faulted state, and fail. As I've written this sample, I’ve added a small constant delay in the loop that starts tasks. Now, its likely that some tasks finish before all 20 tasks start. That’s why even after some tasks fail, a few succeed.

Every run will be different, because every run produces output from tasks at a different rate.

Stephen's whitepaper has an example of how to solve this problem (throttling, in the TAP Combinations section). His solution starts the maximum allowed number of concurrent tasks, and then awaits for one to finish. As each task finishes, a new one is started. When there are no more tasks to start, the remaining tasks are awaited in turn until all the work is done. I've adapted that method for my sample.

static async Task DoThrottledSampleWork(int concurrencyLevel)
{
     var engine = new ThrottledService();
     const int CONCURRENCY_LEVEL = 5;
     int nextIndex = 0;
     var throttledTasks = new List<Task<string>>();
     while (nextIndex < CONCURRENCY_LEVEL && nextIndex < 20)
     {
         throttledTasks.Add(engine.ProducePithySaying());
         nextIndex++;
     }

     while (throttledTasks.Count > 0)
     {
         try
         {
            Task<string> finishedTask = await Task.WhenAny(throttledTasks);
             throttledTasks.Remove(finishedTask);
             Console.WriteLine(await finishedTask);
         }
         catch (Exception exc)
         {
Console.WriteLine(exc.ToString());
}
         if (nextIndex < 20)
         {
             throttledTasks.Add(engine.ProducePithySaying());
             nextIndex++;
         }
     } }

Let's walk carefully through this method to understand what it does, and how it accomplishes its work. If you've never programmed with Task objects before, this can be a difficult concept. The first part is obvious: start as many tasks as can run concurrently.

while (nextIndex < CONCURRENCY_LEVEL && nextIndex < 20)
{
     throttledTasks.Add(engine.ProducePithySaying());
     nextIndex++; }

The next loop is where the interesting task - based work comes in. The code awaits Task.WhenAny to retrieve the task that finishes first. Then, that task is removed from the list of running tasks, and processed. The order here is important. Remember that Task.WhenAny returns a Task<Task<string>>, so after awaiting it, the type of finishedTask is Task<string>. The code removes that task from the list before awaiting it. That means if the task is removed from the running task list even if it's faulted.

After processing the finished task, the last part of the loop starts a new task (if there is more work to be done):

while (throttledTasks.Count > 0)
{
     try
     {
        Task<string> finishedTask = await Task.WhenAny(throttledTasks);
        throttledTasks.Remove(finishedTask);
         Console.WriteLine(await finishedTask);
     }
     catch (Exception exc)
     {
Console.WriteLine(exc.ToString());
}
     if (nextIndex < 20)
     {
         throttledTasks.Add(engine.ProducePithySaying());
         nextIndex++;
     } }

Now, when I run the app, you can see the following output:

starting task 3, 3 now  Hello World delayed 720
starting task 5, 5 now  so I got that going for me, which is nice delayed 868
starting task 4, 4 now  See your future, Be your future delayed 1853
starting task 1, 1 now  so I got that going for me, which is nice delayed 1971
starting task 8, 5 now  Hello World delayed 1099
starting task 7, 5 now  See your future, Be your future delayed 3213
starting task 2, 2 now  so I got that going for me, which is nice delayed 4118
starting task 12, 5 now  Hello World delayed 401
starting task 6, 5 now  Hello World delayed 3785
starting task 9, 5 now  See your future, Be your future delayed 3255
starting task 14, 5 now  Hello World delayed 1848
starting task 16, 5 now  See your future, Be your future delayed 323
starting task 10, 5 now  Hello World delayed 4092
starting task 13, 5 now  Hello World delayed 2727
starting task 11, 5 now  so I got that going for me, which is nice delayed 4567
starting task 17, 5 now  Hello World delayed 2160
starting task 19, 5 now  so I got that going for me, which is nice delayed 2009
starting task 15, 5 now  Hello World delayed 4314
starting task 18, 5 now  See your future, Be your future delayed 2809
starting task 20, 5 now  so I got that going for me, which is nice delayed 3707

Notice how once the number of running tasks reaches 5, each new started task reports that it's the fifth task. Once some task finishes, and only when a task finishes, a new task starts. You may wonder why the number doesn't decrease as the last tasks are started. That's because when the code starts the task, there are 5 concurrent tasks. When the last tasks finish, there are fewer, but this code does keep the maximum number of running tasks for as long as possible.

I can now also remove the small delay in the loop that starts tasks. Even without it, too many tasks can't start.

These combinators are a great way to write small utility methods to manipulate task objects. I'll explore more of those in future blog posts.

7 Comments

  • Daniel Marbach said

    Hy bill
    This blogpost shows what is bothering me since the release of TPL. Many people got the impression that TPL is a much more higher level abstraction than primitive threading. But the problem is: It's not really. It certainly solves some lower level details better bit is still too complicated for application code. Whenever you use TPL you have to worry about so many things that I question its usefuleness for the day to day development. For example tackling your problem with dataflows would be more elegant for application code and also make the code easier to understand and maintain

    Daniel

  • Bill Wagner said

    Dear readers, I believe I just fat fingered and deleted a number of good comments that I meant to approve.

    I apologize, and I'm writing notes quickly about the topics. If you don't see your comment above, that's because I did make that mistake. I'll try to follow up as best I can, but please re-post and I'll be more careful on my next approval cycle.

  • Frantisek Kaduk said

    I totally agree with Daniel Marbach, it can be clearly solved by TPL Dataflow.

    My short example showing how to emploey 2 dataflow cubes: buffer (input), action(output) and max degree of parallelism set to 5: https://gist.github.com/kadukf/7553682

  • Svick said

    @Ricardo: That can limit the number of threads used, but not the number of async operations, which is the goal here.

Add a Comment