Question

I have a massive data set, which I need to populate into database. I am writing a code based on Java Concurrency Library (producer - consumer model with BlockingQueue and executorService) which can keep adding data to the queue as it arrives. The consumer keeps retrieving the data unless it encounter "poison"(and then it dies).

The main class, with dummy data to be posted. The queue size is intentionally kept smaller:

public class MessageProcessor {
private static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
        5, true);
private static final ExecutorService executor = Executors
        .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ExecutorService consumerExecutor = Executors
        .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final String POISON = "THE_END";


public void processMessages() throws InterruptedException {

//Create and start consumer 
    Runnable consumer = new MessageConsumer(queue);
    consumerExecutor.execute(consumer);

    for (String payload : getPayload()) {
        //create and start producer with given payload
        Runnable producer = new MessageProducer(queue, payload);
        executor.execute(producer);
    }

    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.MINUTES);

    consumerExecutor.shutdown();
    consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);

}

private List<String> getPayload() {
    List<String> payloads = new ArrayList<>();
    payloads.add("data1");
    payloads.add("data2");
    payloads.add("data3");
    payloads.add("data4");
    payloads.add("data5");
    payloads.add("data6");
    payloads.add("data7");
    payloads.add("data8");
    payloads.add("data9");
    payloads.add("data10");
    payloads.add(POISON);

    return payloads;
}}

The Producer Runnable:

public class MessageProducer implements Runnable {
private BlockingQueue<String> queue;
private String payload;

public MessageProducer(BlockingQueue<String> queue, String payload) {
    this();
    this.queue = queue;
    this.payload = payload;
}

private MessageProducer() {
}

public void run() {
    try {
            queue.put(payload);
            System.out.println("Put : " + payload );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}}

The Consumer Runnable:

public class MessageConsumer implements Runnable {

private BlockingQueue<String> queue;
private static final String POISON = "THE_END";

public MessageConsumer(BlockingQueue<String> queue) {
    this();
    this.queue = queue;
}

private MessageConsumer() {
}

public void run() {

    String payload = "";
    do {
        try {
            payload = queue.take();
            System.out.println("Got : " + payload );
        } catch (InterruptedException ie) {
            // handle
            break;
        }
    } while (!payload.equals(POISON));
}}

Output:

Put : data1
Put : data2
Put : data3
Put : data7
Put : data6
Put : data5
Got : data1
Got : data2
Got : data3
Got : data5
Put : data10
Put : data8
Put : data9
Got : data6
Got : data7
Put : data4
Put : THE_END
Got : data8
Got : data9
Got : data10
Got : THE_END

When I execute new MessageProcessor.processMessages(), I observe two abnormalities :

  1. The consumer failed to fetch an item : data4 (I am assuming because it fetched poison data ("THE_END") before "data4" could be retrieved) - but why does it not fetch data on the order of insertion from a queue which is FIFO?
  2. Insertion in queue (put) does not happen in order of the items in list (for example, after "data3", "data7" is inserted)

Thanks!!

Was it helpful?

Solution

Your two problems are the same.

Since you have several producers running in parallel, you don't have any guarantee that the first producer will put its element to the queue before the second one. So the items don't come in order in the queue, and the poison comes before data4, which is thus not consumed by the consumer.

OTHER TIPS

Putting entries on the queue is non-deterministic because you use a unique runnable for each message, instead of looping sequentially in a single thread.

As you say, that likely explains why some messages are not seen, becuase they would be after the end.

Your queue is a fifo, yes, but you're not pushing data onto the queue in a fifo order.

If .availableProcessors() returns > 1 , you have several producers pushing data onto the queue - the threads your executor manages have no guarantee of running sequentially in respect to the order you call executor.execute(producer); in.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top