質問

I have a chain of responsibility that consists of 3 stages.

Stages

My Implementations for the stages look like this.

public class InitialStage implements RecordHandler {

    private RecordHandler next;

    @Override
    public void setNext(RecordHandler handler) {
        if (this.next == null) {
            this.next = handler;
        } else {
            this.next.setNext(handler);
        }
    }

    @Override
    public void handleRequest(String record) {
        System.out.println("Processing @ Initial Stage.");
        if (next != null) {
            next.handleRequest(record);
        }
    }
}

The Process Manager

public class ProcessManager {
    private RecordProcessor processor = new RecordProcessor();

    public ProcessManager()
    {
        createPipeLine();
    }

    private void createPipeLine() {
        processor = new RecordProcessor();
        processor.addHandler(new InitialStage());
        processor.addHandler(new MiddleStage());
        processor.addHandler(new FinalStage());
    }

    public void processRecord(String record){
        processor.handleRequest(record);
    }
}

The Interface

public interface RecordHandler {
    public void setNext(RecordHandler handler);
    public void handleRequest(String record);
}

And finally the RecordProcessor,

public class RecordProcessor {

    private RecordHandler successor;
    private RecordHandler first;

    public RecordProcessor(){
    }

    public void addHandler(RecordHandler recordHandler){
        if(this.first == null){
            this.first = recordHandler;
        }
        else{
            this.successor.setNext(recordHandler);  
        }
        this.successor = recordHandler;
    }

    public void handleRequest(String record){
        first.handleRequest(record);
    }
}

Now I have observed that my Middle Stage takes some time to complete. So now I am interested in using a thread pool so the records are processed in a way represented below.

Here I have assumed 3 worker threads.

Suggested Chain

Question

How do I correctly modify my existing CoR to cater the new requirement?

役に立ちましたか?

解決

Answering my own question for the benefit of the community, since I found a feasible solution.

I developed slightly different solution for this problem. Instead of having parallel pipelines , I used a stage multithreaded. (middle stage in this case). In other words instead of passing the individual records, the middle stage uses an input queue and an output queue. The threads spawn up when the input queue is filled with a defined number of records.

The subsequent stages might be idling until the queues are full, but still this implementation is acceptable. And this is can be used when there is no need of sequencing. If you need to follow a sequence then you will need to keep the sequence ID and perform a sort at the output queue.

The Middle Stage

public class MiddleStage implements RecordHandler {

private RecordHandler next;
private ExecutorService executor = Executors.newFixedThreadPool(5);
private Queue<String> inbound = new LinkedList<String>();
Collection<Callable<String>> tasks = new ArrayList<Callable<String>>();

@Override
public void setNext(RecordHandler handler) {
    if (this.next == null) {
        this.next = handler;
    } else {
        this.next.setNext(handler);
    }
}

@Override
public void handleRequest(String record) {
    System.out.println("Adding new record to Queue : " + record);
    inbound.add(record);
    System.out.println("Queue Size : " + inbound.size());

    if (inbound.size() >= 10)
    {
        System.out.println("Processing the batch.");

        for (int i = 0; i < 10; i++){
            tasks.add(new MiddleWorker(inbound.poll()));
        }

        System.out.println("Processing @ Middle Stage. " + record);
        List <Future<String>> results = null;
        try {
            results = executor.invokeAll(tasks, 60, TimeUnit.SECONDS);
            tasks.clear();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        for (Future<String> f : results) {
              try {
                String answer = f.get();
                System.out.println("Outbound : " + answer);
                if (next != null) {
                    next.handleRequest(answer);
                }

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            }
        results.clear();
    }
}
}

The MiddleWorker Class

public class MiddleWorker implements Callable<String> {

private String output;

public MiddleWorker(String record)
{
    output = record + " : OK";
}

@Override
public String call() throws Exception {
    try
    {
        Thread.sleep(2 * 1000);
    }
    catch(final InterruptedException ex)
    {
        ex.printStackTrace();
    }

    return (output);
}
}

Please make a comment if this answer is unclear and I will revise the answer.

他のヒント

You can create an additional "stage" that has the responsibility of passing the work load into a thread pool.

public class PoolExecutingStage implements RecordHandler {

    private Executor threadPool;
    private RecordHandler next;

    public PoolExecutingStage(Executor tp) {
         this.threadPool = tp;      
    }

    @Override
    public void setNext(RecordHandler handler) {
           // similar to your example
    }

    @Override
    public void handleRequest(String record) {
        if (next != null) {
            threadPool.execute(new Runnable() {
                 public void run() {
                     next.handleRequest(record);
                 }
            });               
        }
    }
}

Next you will need to include this new stage in your pipeline by adding a call to processor.addHandler(new PoolExecutingStage(configuredThreadPool)); between the initial and middle stages.


also make note to handle any thread synchronizations that might be required when running this in multiple threads

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top