A Throttling CompletionService
by Dana H. P'Simer
Mon, Jun 8, 2009
The CompletionService interface defines a service that allows the caller to submit tasks to be completed in the future. A commonly used implementation is the ExecutorCompletionService, which uses an Executor to run the tasks that have been submitted.
In most cases the Executor used will be the ThreadPoolExecutor which creates a pool of threads to use in executing submitted tasks. ThreadPoolExecutors are very expensive things to create. They should not be created on-the-fly because of this. ThreadPoolExecutors are excellent at limiting the total number of threads running a particular type of task and therefore provide a safeguard against thread leakage or other scenarios that may result in running out of threads. CompletionServices are, on the other hand, rather inexpensive to create.
In a recent use case I was presented with I realized that I needed to create a varying number of tasks that needed to run in parallel but I did not want one particularly large request to starve out other requests and make them wait for an available thread from the pool to execute their tasks. That’s when I came up with the idea that if I could have a CompletionService that would limit the number of tasks that would simultaneously be submitted to the Executor, I could have one thread pool that is shared by all requests.
At first I thought I would extend the ExecutorCompletionService. As I wrote the code I realized I was just delegating all the behavior to the methods defined in CompletionService. So, I created the “ThrottledCompletionService” as a Decorator which takes another CompletionService. In this way I was able to add this functionality to any CompletionService not just a ExecutorCompletionService.
Here is the code:
package com.dhptech.utils.concurrent;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.dhptech.utils.concurrent.ConcurrentUtils;
/**
* This CompletionService decorates another CompleationService and throttles the
* number of concurrently submitted tasks that will be submitted and any given time.
*
* This can be used in conjunction with an Executor to complete tasks but keep
* larger batches of tasks from starving out smaller batches. Each "batch" whould
* get a new ThrottledCompletionService but all of them use the same central
* Executor.
*
* This object is lightweight enough to be created and destroyed for each batch
* of tasks that need to be submitted to a common Executor.
*
* @author danap
*/
public class ThrottledCompletionService<V> implements CompletionService<V> {
/**
* Acts as a base class to implement the Future interface for the Runnable
* and callable variants.
*
* @param <T> the return type of the task.
*/
private abstract class ThrottledTask<T> implements Future<T> {
private CountDownLatch submitted = new CountDownLatch(1);
private Future<T> delegateFuture = null;
private boolean cancelled = false;
/**
* Sets the delegateFuture and counts down the submitted latch.
*
* @param delegateFuture the future from the decorated completion service.
*/
protected void setDelegateFuture(Future<T> delegateFuture) {
this.delegateFuture = delegateFuture;
submitted.countDown();
}
/**
* Should submit the task to the completion service.
*/
public abstract void submit();
/**
* @see Future#cancel(boolean)
*/
public boolean cancel(boolean mayInterruptIfRunning) {
if ( submitted.getCount() > 0 || delegateFuture == null ) {
cancelled = true;
return true;
} else {
return delegateFuture.cancel(mayInterruptIfRunning);
}
}
/**
* @see Future#isCancelled()
*/
public boolean isCancelled() {
if ( cancelled ) {
return true;
}
if ( submitted.getCount() == 0 ) {
return delegateFuture.isCancelled();
}
return false;
}
/**
* @see Future#isDone()
*/
public boolean isDone() {
if ( submitted.getCount() == 0 ) {
return delegateFuture.isDone();
}
return false;
}
/**
* @see Future#get()
*/
public T get() throws InterruptedException, ExecutionException {
submitted.await();
return delegateFuture.get();
}
/**
* Get the value from this future.
*
* @see Future#get(long, java.util.concurrent.TimeUnit)
*
* NOTE: because this operation will first wait for the task to be submitted and
* then wait for the future returned by the decorated completion service, the
* timeout is converted to nanoseconds and the time taken to wait for the
* task to be submitted is subtracted from it before passing it to the get
* method of the future returned by the decorated compleation service.
*/
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// Convert the timeout unit to nanoseconds.
long timeoutNanos = ConcurrentUtils.convertTimeUnitToNanos(timeout, unit);
// record the time we started waiting.
long start = System.nanoTime();
submitted.await(timeoutNanos, TimeUnit.NANOSECONDS);
// remove the time we waited from the timeout.
long elapsed = System.nanoTime() - start;
timeoutNanos -= elapsed;
// wait the remainder of the timeout for the result.
return delegateFuture.get(timeoutNanos,TimeUnit.NANOSECONDS);
}
}
/**
* An implementation of ThrottledTask that handles Runnables.
*/
private class ThrottledRunnable extends ThrottledTask<V> implements Runnable {
private Runnable delegate;
private V value;
/**
* Contructs a new ThrottledRunnable.
*
* @param delegate the runnable that we will delegate to.
*
* @param value the value to return from this task.
*/
public ThrottledRunnable(Runnable delegate, V value) {
this.delegate = delegate;
this.value = value;
}
/**
* @see ThrottledTask#submit()
*/
public void submit() {
setDelegateFuture(mDelegate.submit(this,value));
}
/**
* @see Runnable#run()
*
* calls {@link ThrottledCompletionService#submitNext()} when the task is done.
*/
public void run() {
try {
delegate.run();
} finally {
submitNext();
}
}
}
/**
* An implementation of ThrottledTask for Callables.
*/
private class ThrottledCallable extends ThrottledTask<V> implements Callable<V> {
private Callable<V> delegate;
/**
* Construct a ThrottledCallable.
*
* @param delegate the callable we will delegate to.
*/
public ThrottledCallable(Callable<V> delegate) {
this.delegate = delegate;
}
/**
* @see Callable#call()
*
* calls {@link ThrottledCompletionService#submitNext()} when the decorated
* task is complete.
*
* @return the return value of the callable.
*
* @throws java.lang.Exception an exception when something bad happens.
*/
public V call() throws Exception {
try {
return delegate.call();
} finally {
submitNext();
}
}
/**
* @see ThrottledTask#submit()
*/
public void submit() {
setDelegateFuture(mDelegate.submit(this));
}
}
private int mThrottle = 1;
private CompletionService<V> mDelegate;
private int mSubmitted = 0;
private Queue<ThrottledTask<V>> mTaskQueue = new LinkedList();
/**
* Creates a new ThrottledCompletionService wrapping the given completion service
* as a delegate.
*
* @param delegate
*/
public ThrottledCompletionService(CompletionService<V> delegate) {
this.mDelegate = delegate;
}
/**
* Creates a new ThrottledCompletionServices wrapping a new ExecutorCompletionService
* that references the passed executor.
*
* @param executor the executor.
*/
public ThrottledCompletionService(Executor executor) {
this.mDelegate = new ExecutorCompletionService<V>(executor);
}
/**
* Creates a new ThrottledCompletionService wrapping the given completion service
* as a delegate.
*
* @param delegate
*/
public ThrottledCompletionService(CompletionService<V> delegate, final int throttle) {
this.mDelegate = delegate;
this.mThrottle = throttle;
}
/**
* Creates a new ThrottledCompletionServices wrapping a new ExecutorCompletionService
* that references the passed executor.
*
* @param executor the executor.
*/
public ThrottledCompletionService(Executor executor, final int throttle) {
this.mDelegate = new ExecutorCompletionService<V>(executor);
this.mThrottle = throttle;
}
/**
* Decrements the submitted count, attempts to submit the next task. Callled
* when a submitted task completes. If there are no tasks to submit, simply
* return.
*/
private synchronized void submitNext() {
mSubmitted -= 1;
if ( mSubmitted < mThrottle ) {
while(true) {
ThrottledTask<V> task = mTaskQueue.poll();
if ( task != null ) {
// skip any tasks that were cancelled before we submitted them.
if (task.isCancelled()) {
continue;
}
task.submit();
mSubmitted += 1;
}
break;
}
}
}
/**
* @see CompletionService#submit(java.util.concurrent.Callable)
*
* @param task the task to submit
*
* @return a future for
*/
public synchronized Future<V> submit(Callable<V> task) {
ThrottledCallable ttask = new ThrottledCallable(task);
if ( mSubmitted < mThrottle ) {
mSubmitted += 1;
return mDelegate.submit(ttask);
} else {
mTaskQueue.offer(ttask);
return ttask;
}
}
/**
* @see CompletionService#submit(java.lang.Runnable, java.lang.Object)
*
* @param task the task to be run.
* @param result the result to return from the Future<V> object.
* @return a future object that can be used to access the result of the submitted task.
*/
public synchronized Future<V> submit(Runnable task, V result) {
ThrottledRunnable ttask = new ThrottledRunnable(task,result);
if ( mSubmitted < mThrottle ) {
mSubmitted += 1;
return mDelegate.submit(ttask,result);
} else {
mTaskQueue.offer(ttask);
return ttask;
}
}
/**
* @see CompletionService#take()
*/
public Future<V> take() throws InterruptedException {
return mDelegate.take();
}
/**
* @see CompletionService#poll(long, java.util.concurrent.TimeUnit)
*/
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return mDelegate.poll(timeout, unit);
}
/**
* @see CompletionService#poll<span class="javaComment">()
*/
public Future<V> poll() {
return mDelegate.poll();
}
}