In an earlier article on Executor, we looked at how we can create use the executor to create thread pools to manage our multi-threaded program. We shall continue to explore additional features of this class.
Here we will extend the ThreadPooledExecutor class and provide our implementation to replace the empty implementation.
ThreadPooledExecutor has 3 methods called beforeExecute(), afterExecute() and terminate()
Code snippet 1
- public class ExtendedThreadPoolExecutor extends ThreadPoolExecutor{
- protected void terminated(){
- System.out.println("---------Number of tasks completed: "+ getCompletedTaskCount());
- }
- protected void afterExecute(Runnable r, Throwable t)
- {
- System.out.println("afterExecute()--Tasks executed: "+getCompletedTaskCount());
- }
- protected void beforeExecute(Thread t, Runnable r)
- {
- System.out.println("beforeExecute()--Tasks executed: "+getCompletedTaskCount());
- }
When you run the code, you will appropriate outputs on the console. We will notice that beforeExecute() is called every time after Executor.execute() method is called and can be used to initialize threadlocal variables or set any other pre-conditions for the Runnable tasks.
Similarly afterExecute() is called after execute() is called but not after the run() is finished. I feel beforeExecute() is an useful method but afterExecute() is of limited use.
Then we have terminate() which is called after we call shutdown() and all threads are terminated. Also useful to set and test some post-conditions.
Let's take a quick look at different types of work queues we have to submit tasks to the Executor. In our earlier example we have seen an example of BoundedBlockedQueue. In this article we take a look at two other types of work queues, SynchronousQueue and unbound blocked queue. In a synchronous queue, Executor will keep allow the tasks to get backed up in a queue. If the current running threads cannot handle the task being added, a new thread will be spawned.
In the following code snippet, we have defined a simple Runnable task which prints it's thread id. We have some processing code to increase the amount of run time.
code snippet 2
- class Consumer2 implements Runnable
- { //some simple runnable objects
- private String name;
- public Consumer2(String name){
- this.name=name;
- }
- private AtomicInteger runCount=new AtomicInteger(0);
- private ThreadLocal threadLocalInteger =new ThreadLocal();
- public void initThreadLocalInt(int val) {
- threadLocalInteger.set(val);
- }
- public void run() {
- threadLocalInteger.set(1);
- for (int i=0; i<25; i++){
- runCount.addAndGet(i);
- runCount.compareAndSet(10, (Integer) threadLocalInteger.get());
- }
- threadLocalInteger.set(runCount.get());
- //System.out.println(name +" Task#"+threadLocalInteger.get() +" ThreadId:" + Thread.currentThread().getId());
- }
- }
Code snippet 3. The main method...
- public static void main(String[] args ) throws InterruptedException
- {
- Consumer2 task=new Consumer2("executorWithSynchQueue");
- int defaultThreadCount=1;int maxThreadCount=10;
- ExtendedThreadPoolExecutor executorWithSynchQueue= new ExtendedThreadPoolExecutor(defaultThreadCount,maxThreadCount,keepAliveTime,TimeUnit.MILLISECONDS,pendingTasksSynchQueue);
- for (int i=0; i< 25; i++)
- {
- task.initThreadLocalInt(i);
- executorWithSynchQueue.execute(task);
- }
- executorWithSynchQueue.shutdown();
- executorWithSynchQueue.awaitTermination(1000, TimeUnit.MILLISECONDS);
- }
In code listing above( snippet 3), we have created an instance of our executor class and initialize it with a synchronous queue. In line 5, we have set the default thread count to 1 and max to 10. When you execute this code, the thread count in the executor will expand to all possible 10 threads for these 25 tasks. The output will be something like. You would see almost 10 different threads being spawned by the Executor.
You might also see Rejected exception being thrown if the pool already contains max number of threads and they are all busy.
- executorWithSynchQueue Task#291 ThreadId:7
- executorWithSynchQueue Task#591 ThreadId:8
- executorWithSynchQueue Task#891 ThreadId:9
- executorWithSynchQueue Task#1191 ThreadId:10
- executorWithSynchQueue Task#1491 ThreadId:11
- executorWithSynchQueue Task#1791 ThreadId:11
- executorWithSynchQueue Task#2091 ThreadId:10
- executorWithSynchQueue Task#2391 ThreadId:9
- executorWithSynchQueue Task#2691 ThreadId:8
- executorWithSynchQueue Task#2991 ThreadId:7
- executorWithSynchQueue Task#3291 ThreadId:12
- executorWithSynchQueue Task#3591 ThreadId:13
- executorWithSynchQueue Task#3891 ThreadId:14
- executorWithSynchQueue Task#4191 ThreadId:15
- executorWithSynchQueue Task#4491 ThreadId:16
On the other hand, if you use the UnboundedLinkedQueue, you have an option of choosing an unbounded queue to store unexecuted tasks. But the thread count will never expand beyond the default thread count.
The output of the same code configured with an LinkedBlockingQueue will be as follows:
- executorWithUnboundedQueue Task#291 ThreadId:7
- executorWithUnboundedQueue Task#591 ThreadId:7
- executorWithUnboundedQueue Task#891 ThreadId:7
- executorWithUnboundedQueue Task#1191 ThreadId:7
- executorWithUnboundedQueue Task#1491 ThreadId:7
- executorWithUnboundedQueue Task#1791 ThreadId:7
- executorWithUnboundedQueue Task#2091 ThreadId:7
- executorWithUnboundedQueue Task#2391 ThreadId:7
- executorWithUnboundedQueue Task#2691 ThreadId:7
- executorWithUnboundedQueue Task#2991 ThreadId:7
Note, it's the same thread, with thread id 7, which is being to used to execute all the tasks. Here there is no possibility of RejectedExecutionException being thrown.













Good one
Post new comment