Answering my own question for the benefit of the community, since I found a feasible solution.
I developed slightly different solution for this problem. Instead of having parallel pipelines , I used a stage multithreaded. (middle stage in this case). In other words instead of passing the individual records, the middle stage uses an input queue and an output queue. The threads spawn up when the input queue is filled with a defined number of records.
The subsequent stages might be idling until the queues are full, but still this implementation is acceptable. And this is can be used when there is no need of sequencing. If you need to follow a sequence then you will need to keep the sequence ID and perform a sort at the output queue.
The Middle Stage
public class MiddleStage implements RecordHandler {
private RecordHandler next;
private ExecutorService executor = Executors.newFixedThreadPool(5);
private Queue<String> inbound = new LinkedList<String>();
Collection<Callable<String>> tasks = new ArrayList<Callable<String>>();
@Override
public void setNext(RecordHandler handler) {
if (this.next == null) {
this.next = handler;
} else {
this.next.setNext(handler);
}
}
@Override
public void handleRequest(String record) {
System.out.println("Adding new record to Queue : " + record);
inbound.add(record);
System.out.println("Queue Size : " + inbound.size());
if (inbound.size() >= 10)
{
System.out.println("Processing the batch.");
for (int i = 0; i < 10; i++){
tasks.add(new MiddleWorker(inbound.poll()));
}
System.out.println("Processing @ Middle Stage. " + record);
List <Future<String>> results = null;
try {
results = executor.invokeAll(tasks, 60, TimeUnit.SECONDS);
tasks.clear();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for (Future<String> f : results) {
try {
String answer = f.get();
System.out.println("Outbound : " + answer);
if (next != null) {
next.handleRequest(answer);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
results.clear();
}
}
}
The MiddleWorker Class
public class MiddleWorker implements Callable<String> {
private String output;
public MiddleWorker(String record)
{
output = record + " : OK";
}
@Override
public String call() throws Exception {
try
{
Thread.sleep(2 * 1000);
}
catch(final InterruptedException ex)
{
ex.printStackTrace();
}
return (output);
}
}
Please make a comment if this answer is unclear and I will revise the answer.