I have a massive data set, which I need to populate into database. I am writing a code based on Java Concurrency Library (producer - consumer model with BlockingQueue and executorService) which can keep adding data to the queue as it arrives. The consumer keeps retrieving the data unless it encounter "poison"(and then it dies).
The main class, with dummy data to be posted. The queue size is intentionally kept smaller:
public class MessageProcessor {
private static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
5, true);
private static final ExecutorService executor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ExecutorService consumerExecutor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final String POISON = "THE_END";
public void processMessages() throws InterruptedException {
//Create and start consumer
Runnable consumer = new MessageConsumer(queue);
consumerExecutor.execute(consumer);
for (String payload : getPayload()) {
//create and start producer with given payload
Runnable producer = new MessageProducer(queue, payload);
executor.execute(producer);
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
consumerExecutor.shutdown();
consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
}
private List<String> getPayload() {
List<String> payloads = new ArrayList<>();
payloads.add("data1");
payloads.add("data2");
payloads.add("data3");
payloads.add("data4");
payloads.add("data5");
payloads.add("data6");
payloads.add("data7");
payloads.add("data8");
payloads.add("data9");
payloads.add("data10");
payloads.add(POISON);
return payloads;
}}
The Producer Runnable:
public class MessageProducer implements Runnable {
private BlockingQueue<String> queue;
private String payload;
public MessageProducer(BlockingQueue<String> queue, String payload) {
this();
this.queue = queue;
this.payload = payload;
}
private MessageProducer() {
}
public void run() {
try {
queue.put(payload);
System.out.println("Put : " + payload );
} catch (InterruptedException e) {
e.printStackTrace();
}
}}
The Consumer Runnable:
public class MessageConsumer implements Runnable {
private BlockingQueue<String> queue;
private static final String POISON = "THE_END";
public MessageConsumer(BlockingQueue<String> queue) {
this();
this.queue = queue;
}
private MessageConsumer() {
}
public void run() {
String payload = "";
do {
try {
payload = queue.take();
System.out.println("Got : " + payload );
} catch (InterruptedException ie) {
// handle
break;
}
} while (!payload.equals(POISON));
}}
Output:
Put : data1
Put : data2
Put : data3
Put : data7
Put : data6
Put : data5
Got : data1
Got : data2
Got : data3
Got : data5
Put : data10
Put : data8
Put : data9
Got : data6
Got : data7
Put : data4
Put : THE_END
Got : data8
Got : data9
Got : data10
Got : THE_END
When I execute new MessageProcessor.processMessages(), I observe two abnormalities :
- The consumer failed to fetch an item : data4 (I am assuming because it fetched poison data ("THE_END") before "data4" could be retrieved) - but why does it not fetch data on the order of insertion from a queue which is FIFO?
- Insertion in queue (put) does not happen in order of the items in list (for example, after "data3", "data7" is inserted)
Thanks!!