Java Concurrency Series - Simple Executor example

Tagged:  
Average: 4.8 (4 votes)

With Java 1.5 we have a new way of managing threads or threaded tasks. We no longer need to call start() and join()on our Thread object. The new concurrent package provides us with an elegant Executor class to manage our threads.

Let's go through a simple example given below. We have created a simple ThreadPool with default pool size of 2 and max size of 4. In the method createMultipleConsumerThreads(), we have created an instance of a Runnable task, Consumer and passed it on to the execute() method.
The shutdown method will gracefully terminate all threads if there are no running tasks. Thus this Executor framework frees a programmer from thread management and allows us to focus on our application code.

Code Snippet 1

  1. public class SimplePooledExecutorSample {
  2.         private ThreadPoolExecutor executor;
  3.         private int defaultThreadCount=2;
  4.         private int maxThreadCount=4;
  5.         private int keepAliveTime=100;
  6.         private int MAX_PENDING_TASKS=100;
  7.         private BlockingQueue pendingTasksQueue= new ArrayBlockingQueue(MAX_PENDING_TASKS);
  8.  
  9.         public SimplePooledExecutorSample() {
  10.                  executor =new ThreadPoolExecutor(defaultThreadCount,maxThreadCount,
  11.                  keepAliveTime,TimeUnit.MILLISECONDS,pendingTasksQueue );
  12.                
  13.                  
  14.                  }
  15.                
  16.         public void createMultipleConsumerThreads(int consumerThreadCount )
  17.         {
  18.                 Consumer c= new Consumer();
  19.                 for (int i=0; i < consumerThreadCount; i++ ){
  20.                         executor.execute(c);
  21.                 }
  22.         }
  23.         public void shutdown(){
  24.                 //graceful shutdown.
  25.                 executor.shutdown();
  26.         }

Let's see the output of this class after we have added the following main() method.

Code Snippet 2

  1.         public static void main(String[] args ){
  2.                 SimplePooledExecutorSample sample=new SimplePooledExecutorSample();
  3.                 sample.createMultipleProducerThreads(3);
  4.                 sample.createMultipleConsumerThreads(3);
  5.                 sample.createMultipleProducerThreads(3);
  6.                
  7.                 //graceful shutdown.
  8.                 sample.shutdown();
  9.         }
  10. class Consumer implements Runnable { //some simple runnable objects
  11.         private int runCount=1;
  12.         public void run() {             System.out.println("Consuming..."+runCount++ +" ThreadId:" + Thread.currentThread().getId());   }
  13. }
  14. class Producer implements Runnable {
  15.         private int runCount=1;
  16.         public void run() {            
  17.                 for(int i=0;i<10000;i++){ //some activity to keep the CPU buzy..
  18.                         double x=(double)100.89*(double)124.99;
  19.                 }
  20.                 System.out.println("Producing..."+runCount++ +" ThreadId:"  + Thread.currentThread().getId()); 
  21.         }
  22. }

First aspect to note is the threads are being pooled. In this case we will most probably see only two thread ids indicating that the pool is being created with two threads and as the run() methods are lightweight, there is no need to create additional threads.
Output Snippet 1
  1. Producing...1 ThreadId:7
  2. Producing...3 ThreadId:7
  3. Producing...2 ThreadId:8
  4. Consuming...1 ThreadId:7
  5. Consuming...2 ThreadId:7
  6. Consuming...3 ThreadId:7
  7. Producing...1 ThreadId:7
  8. Producing...2 ThreadId:7
  9. Producing...3 ThreadId:7

If we increase the default "defaultThreadCount" (in line 3 of the Snippet 1) to 4, we might end up seeing 4 different thread ids.
Also note ( in output snippet 1) that all 9 threads finishing executing before the threadpool is shutdown. If you print out the thread groups, you will also notice that all threads belong to the same group. If you need different thread groups, we will have to provide our own ThreadFactory Implementation.

Now if we try to add more than 200 tasks in a for loop ( line 3, Snippet 2) we will get an RejectedExecution exception. The pending tasks cannot go beyond the size of the blocking array you can configured the ThreadPool with.
To handle this gracefully, we can initialize the ThreadPool with a handler as shown below.
Snippet 3

  1.  executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
  2.  
  3.                         public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1)
  4.                         {
  5.                                 System.out.println("Rejected Execution:- Don't add tasks way beyond the size of the Blocking Que " + arg0.getClass().getName());
  6.                                
  7.                                 System.out.println("Tasks remaining : "+ arg1.getTaskCount());
  8.                                 System.out.println("Tasks executed: "+arg1.getCompletedTaskCount());
  9.                         }
  10.                        
  11.                         }  );

If we contrast this with the Thread structure of pre-Java1.5, we will end up with an implementation like this.
Snippet 4

  1. public void createMultipleProducerThreadsUsingJava1_4(int producerThreadCount )
  2.         {
  3.                 Producer p= new Producer();
  4.                 Thread consumerThread=new Thread(p);
  5.                 for (int i=0; i < producerThreadCount; i++ ){
  6.                        
  7.                         try {
  8.                                 consumerThread.start();
  9.                                 consumerThread.join(); //irrelevant whether you call join() or not
  10.                                
  11.                         } catch (InterruptedException e) {
  12.                                 e.printStackTrace();
  13.                         }
  14.                         catch (IllegalThreadStateException e) {
  15.                                 System.err.println("Cannot call Thread.start() twice "+ e.getMessage());
  16.                         }
  17.                 }
  18.         }

This will result in Illegal Thread exception, JVM doesn't allow us to call start() on a Thread object twice.
So the option would be to create a thread object per run() method invocation which causes the additional overhead of creation and management.

Snippet 5

  1. public void createMultipleConsumerThreadsUsingJava1_4(int consumerThreadCount )
  2.         {
  3.                 Consumer c= new Consumer();
  4.                
  5.                 for (int i=0; i < consumerThreadCount; i++ ){
  6.                         Thread consumerThread=new Thread(c); //will have to create multiple instances of Thread objects.
  7.                         consumerThread.start();
  8.                 }
  9.         }
  10. public static void main(String[] args ){
  11.                 SimplePooledExecutorSample sample=new                        SimplePooledExecutorSample();
  12.                
  13.                
  14.                 sample.createMultipleConsumerThreadsUsingJava1_4(5);
  15.                
  16.                
  17.         }

The output of this code is as expected,one thread per run invocation. You will five different thread ids.
  1. Consuming...1 ThreadId:7
  2. Consuming...2 ThreadId:8
  3. Consuming...3 ThreadId:9
  4. Consuming...4 ThreadId:10
  5. Consuming...5 ThreadId:11

Post new comment

CAPTCHA
This question is for testing whether you are a human visitor and to prevent automated spam submissions.
Q
Q
R
d
L
m
Enter the code without spaces and pay attention to upper/lower case.