Domanda

I am using the LMAX disruptor in my web application which takes the http request parameters and handle them to the ringbuffer. 3 event-handlers handle and process data, the final one, saves it to the database. Initialize the ringbuffer once, when the servlet is instantiated. is this right?

public void init() throws ServletException {

    Disruptor<CampaignCode> disruptor = new Disruptor<CampaignCode>(
            CampaignCode.FACTORY, 1024, Executors.newCachedThreadPool());

    EventHandler<CampaignCode> campaignDetailsLoader = new CampaignDetailsLoaderEvent();
    EventHandler<CampaignCode> templateEvent = new TemplateBuildEvent();
    EventHandler<CampaignCode> codeGenerator = new CodeGenerationEventHandler();
    EventHandler<CampaignCode> campaignSaveEventHandler= new CampaignSaveEventHandler();

    disruptor.handleEventsWith(templateEvent, campaignDetailsLoader).then(
            codeGenerator).then(campaignSaveEventHandler);
    this.ringBuffer = disruptor.start();
}

here I put the values straight into the ringbuffer

    @Override
protected void doPost(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException      {
    String campaignId = request.getParameter("campaignId");
    String campaignType = request.getParameter("campaignType");
    if (campaignId != null && !campaignId.isEmpty()) {
        long sequence = ringBuffer.next();
        CampaignCode campaign = ringBuffer.get(sequence);
        campaign.setCampaignId(Long.parseLong(campaignId));
        campaign.setCampaignType(campaignType);
        ringBuffer.publish(sequence);
    }
  }

event handlers

public class CampaignDetailsLoaderEvent implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         //load details from db and process
         // udpate values to the event object
  }
 }

  public class TemplateBuildEvent implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // find the template of this type
         // set template to the event object
  }
 }

 public class CodeGenerationEventHandler implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // generate custom dsl code and execute it
         // update the output to the event object 
         //next handler will save it the db
  }
 }

  public class CampaignSaveEventHandler implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // save the details to db
         // done!
  }
 }

is this the right way to publish to the ringbuffer? do I need to synchronize the "ringBuffer" object? First 2 events run parallel, then the 3rd event. How should I handle this when I have fast publishers and slow consumers? I am using the disruptor 3.1.1, I could not find good usage example of disruptor in a web environment. A simple code implementation, if you have done one, would help me understand this a lot!

È stato utile?

Soluzione

This implementation is correct given the code requirements you've stated. Best practice is to wrap your publishing code in a try-finally block to ensure that a claimed sequence is always published:

long sequence = ringBuffer.next();
  try {
  Event e = ringBuffer.get(sequence);
  // Do some work with the event.
} finally {
  ringBuffer.publish(sequence);
}

It may also be a good idea to explicitly specify in the constructor that you need a multiple-producer Disruptor, but that is already done in the default constructor you've used. You should not synchronize writes to the RingBuffer as the process of claiming and publishing the sequence number is already thread-safe. Note however that there is no guarantee that the order in which events are published to the RingBuffer in concurrent invocations of doPost() will be the same as the order they're received by your web application.

The Disruptor is just a specialized queue and is therefore subject to all the usual problems they have with unbounded growth. If there are no available slots in the buffer, your call to ringBuffer.next() will block until one becomes available. You should both provide sufficient capacity to the RingBuffer to handle bursts of traffic, and consider ways to apply back pressure in the (hopefully rare) case that the buffer is filled.

In your particular use case, if the CodeGeneration or CampaignSave steps are taking a very long time compared to the first two, and can be deferred, it may make sense to use additional Disruptors/RingBuffers to queue up events for those executions.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top