A Throttling CompletionService

by Dana H. P'Simer

Mon, Jun 8, 2009


The CompletionService interface defines a service that allows the caller to submit tasks to be completed in the future. A commonly used implementation is the ExecutorCompletionService, which uses an Executor to run the tasks that have been submitted.

In most cases the Executor used will be the ThreadPoolExecutor which creates a pool of threads to use in executing submitted tasks. ThreadPoolExecutors are very expensive things to create. They should not be created on-the-fly because of this. ThreadPoolExecutors are excellent at limiting the total number of threads running a particular type of task and therefore provide a safeguard against thread leakage or other scenarios that may result in running out of threads. CompletionServices are, on the other hand, rather inexpensive to create.

In a recent use case I was presented with I realized that I needed to create a varying number of tasks that needed to run in parallel but I did not want one particularly large request to starve out other requests and make them wait for an available thread from the pool to execute their tasks. That’s when I came up with the idea that if I could have a CompletionService that would limit the number of tasks that would simultaneously be submitted to the Executor, I could have one thread pool that is shared by all requests.

At first I thought I would extend the ExecutorCompletionService. As I wrote the code I realized I was just delegating all the behavior to the methods defined in CompletionService. So, I created the “ThrottledCompletionService” as a Decorator which takes another CompletionService. In this way I was able to add this functionality to any CompletionService not just a ExecutorCompletionService.

Here is the code:

 package com.dhptech.utils.concurrent;
 
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import com.dhptech.utils.concurrent.ConcurrentUtils;
 
 /**
  * This CompletionService decorates another CompleationService and throttles the
  * number of concurrently submitted tasks that will be submitted and any given time.
  *
  * This can be used in conjunction with an Executor to complete tasks but keep
  * larger batches of tasks from starving out smaller batches.  Each "batch" whould
  * get a new ThrottledCompletionService but all of them use the same central
  * Executor.
  *
  * This object is lightweight enough to be created and destroyed for each batch
  * of tasks that need to be submitted to a common Executor.
  *
  * @author danap
  */
 public class ThrottledCompletionService<V> implements CompletionService<V> {
 
   /**
    * Acts as a base class to implement the Future interface for the Runnable
    * and callable variants.
    *
    * @param <T> the return type of the task.
    */
   private abstract class ThrottledTask<T> implements Future<T> {
     private CountDownLatch submitted = new CountDownLatch(1);
     private Future<T> delegateFuture = null;
     private boolean cancelled = false;
 
     /**
      * Sets the delegateFuture and counts down the submitted latch.
      *
      * @param delegateFuture the future from the decorated completion service.
      */
     protected void setDelegateFuture(Future<T> delegateFuture) {
       this.delegateFuture = delegateFuture;
       submitted.countDown();
     }
 
     /**
      * Should submit the task to the completion service.
      */
     public abstract void submit();
 
     /**
      * @see Future#cancel(boolean)
      */
     public boolean cancel(boolean mayInterruptIfRunning) {
       if ( submitted.getCount() > 0 || delegateFuture == null ) {
         cancelled = true;
         return true;
       } else {
         return delegateFuture.cancel(mayInterruptIfRunning);
       }
     }
 
     /**
      * @see Future#isCancelled()
      */
     public boolean isCancelled() {
       if ( cancelled ) {
         return true;
       }
       if ( submitted.getCount() == 0 ) {
         return delegateFuture.isCancelled();
       }
       return false;
     }
 
     /**
      * @see Future#isDone()
      */
     public boolean isDone() {
       if ( submitted.getCount() == 0 ) {
         return delegateFuture.isDone();
       }
       return false;
     }
 
     /**
      * @see Future#get()
      */
     public T get() throws InterruptedException, ExecutionException {
       submitted.await();
       return delegateFuture.get();
     }
 
     /**
      * Get the value from this future.
      *
      * @see Future#get(long, java.util.concurrent.TimeUnit)
      *
      * NOTE: because this operation will first wait for the task to be submitted and
      * then wait for the future returned by the decorated completion service, the
      * timeout is converted to nanoseconds and the time taken to wait for the
      * task to be submitted is subtracted from it before passing it to the get
      * method of the future returned by the decorated compleation service.
      */
     public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
       // Convert the timeout unit to nanoseconds.
       long timeoutNanos = ConcurrentUtils.convertTimeUnitToNanos(timeout, unit);
       // record the time we started waiting.
       long start = System.nanoTime();
       submitted.await(timeoutNanos, TimeUnit.NANOSECONDS);
       // remove the time we waited from the timeout.
       long elapsed = System.nanoTime() - start;
       timeoutNanos -= elapsed;
       // wait the remainder of the timeout for the result.
       return delegateFuture.get(timeoutNanos,TimeUnit.NANOSECONDS);
     }
   }
 
   /**
    * An implementation of ThrottledTask that handles Runnables.
    */
   private class ThrottledRunnable extends ThrottledTask<V> implements Runnable {
     private Runnable delegate;
     private V value;
 
     /**
      * Contructs a new ThrottledRunnable.
      *
      * @param delegate the runnable that we will delegate to.
      *
      * @param value the value to return from this task.
      */
     public ThrottledRunnable(Runnable delegate, V value) {
       this.delegate = delegate;
       this.value = value;
     }
 
     /**
      * @see ThrottledTask#submit()
      */
     public void submit() {
       setDelegateFuture(mDelegate.submit(this,value));
     }
 
     /**
      * @see Runnable#run()
      *
      * calls {@link ThrottledCompletionService#submitNext()} when the task is done.
      */
     public void run() {
       try {
         delegate.run();
       } finally {
         submitNext();
       }
     }
   }
 
   /**
    * An implementation of ThrottledTask for Callables.
    */
   private class ThrottledCallable extends ThrottledTask<V> implements Callable<V> {
     private Callable<V> delegate;
 
     /**
      * Construct a ThrottledCallable.
      *
      * @param delegate the callable we will delegate to.
      */
     public ThrottledCallable(Callable<V> delegate) {
       this.delegate = delegate;
     }
 
     /**
      * @see Callable#call()
      *
      * calls {@link ThrottledCompletionService#submitNext()} when the decorated
      * task is complete.
      *
      * @return the return value of the callable.
      *
      * @throws java.lang.Exception an exception when something bad happens.
      */
     public V call() throws Exception {
       try {
         return delegate.call();
       } finally {
         submitNext();
       }
     }
 
     /**
      * @see ThrottledTask#submit()
      */
     public void submit() {
       setDelegateFuture(mDelegate.submit(this));
     }
   }
 
   private int mThrottle = 1;
   private CompletionService<V> mDelegate;
 
   private int mSubmitted = 0;
   private Queue<ThrottledTask<V>> mTaskQueue = new LinkedList();
 
   /**
    * Creates a new ThrottledCompletionService wrapping the given completion service
    * as a delegate.
    *
    * @param delegate
    */
   public ThrottledCompletionService(CompletionService<V> delegate) {
     this.mDelegate = delegate;
   }
 
   /**
    * Creates a new ThrottledCompletionServices wrapping a new ExecutorCompletionService
    * that references the passed executor.
    *
    * @param executor the executor.
    */
   public ThrottledCompletionService(Executor executor) {
     this.mDelegate = new ExecutorCompletionService<V>(executor);
   }
 
   /**
    * Creates a new ThrottledCompletionService wrapping the given completion service
    * as a delegate.
    *
    * @param delegate
    */
   public ThrottledCompletionService(CompletionService<V> delegate, final int throttle) {
     this.mDelegate = delegate;
     this.mThrottle = throttle;
   }
 
   /**
    * Creates a new ThrottledCompletionServices wrapping a new ExecutorCompletionService
    * that references the passed executor.
    *
    * @param executor the executor.
    */
   public ThrottledCompletionService(Executor executor, final int throttle) {
     this.mDelegate = new ExecutorCompletionService<V>(executor);
     this.mThrottle = throttle;
   }
 
   /**
    * Decrements the submitted count, attempts to submit the next task.  Callled
    * when a submitted task completes.  If there are no tasks to submit, simply
    * return.
    */
   private synchronized void submitNext() {
     mSubmitted -= 1;
     if ( mSubmitted < mThrottle ) {
       while(true) {
         ThrottledTask<V> task = mTaskQueue.poll();
         if ( task != null ) {
           // skip any tasks that were cancelled before we submitted them.
           if (task.isCancelled()) {
             continue;
           }
           task.submit();
           mSubmitted += 1;
         }
         break;
       }
     }
   }
 
   /**
    * @see CompletionService#submit(java.util.concurrent.Callable)
    *
    * @param task the task to submit
    *
    * @return a future for
    */
   public synchronized Future<V> submit(Callable<V> task) {
     ThrottledCallable ttask = new ThrottledCallable(task);
     if ( mSubmitted < mThrottle ) {
       mSubmitted += 1;
       return mDelegate.submit(ttask);
     } else {
       mTaskQueue.offer(ttask);
       return ttask;
     }
   }
 
   /**
    * @see CompletionService#submit(java.lang.Runnable, java.lang.Object)
    *
    * @param task the task to be run.
    * @param result the result to return from the Future<V> object.
    * @return a future object that can be used to access the result of the submitted task.
    */
   public synchronized Future<V> submit(Runnable task, V result) {
     ThrottledRunnable ttask = new ThrottledRunnable(task,result);
     if ( mSubmitted < mThrottle ) {
       mSubmitted += 1;
       return mDelegate.submit(ttask,result);
     } else {
       mTaskQueue.offer(ttask);
       return ttask;
     }
   }
 
   /**
    * @see CompletionService#take()
    */
   public Future<V> take() throws InterruptedException {
     return mDelegate.take();
   }
 
   /**
    * @see CompletionService#poll(long, java.util.concurrent.TimeUnit)
    */
   public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
     return mDelegate.poll(timeout, unit);
   }
 
   /**
    * @see CompletionService#poll<span class="javaComment">()
    */
   public Future<V> poll() {
     return mDelegate.poll();
   }
 }