Question

Je joue dans le cadre Perturbateur, et je suis constaté que mes gestionnaires d'événements ne sont pas invoquées.

Voici mon code de configuration:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

Par ailleurs, je publie des événements. J'ai essayé chacune des deux approches suivantes:

Publication de l'événement approche A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

Dans ce scénario, je trouve le premier EventHandler s'invoqué, mais jamais rien au-delà.

Publication de l'événement approche B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

Dans ce scénario, je trouve qu'aucun des gestionnaires d'événements s'invoqué à tous.

Qu'est-ce que je fais mal?

Mise à jour

Voici mon EventHandler dans son intégralité. Comment dois-je signaler que le traitement est terminé?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
Était-ce utile?

La solution

Chaque gestionnaire d'événements doit fonctionner dans son propre fil de sortie jusqu'à ce que l'habitude de vous arrêter le disrupteur. Puisque vous utilisez un seul exécuteur fileté, seul le premier gestionnaire d'événements qui se produit à exécuter sera jamais exécuté. (Les magasins de classe Disruptor chaque gestionnaire dans un hashmap afin gestionnaire qui serpente la course à pied variera)

Si vous passez à un cachedThreadPool vous devriez trouver tout commence en cours d'exécution. Vous aurez pas besoin de faire une gestion des numéros de séquence parce que est tout en charge par le EventProcessor que les ensembles de classe disrupteur et gère pour vous. Il suffit de traiter chaque événement que vous obtenez est tout à fait exact.

Autres conseils

Vous devez vous assurer que votre searchTermMatchingHandler met à jour son numéro de séquence après traite l'événement. Les gestionnaires d'événements en aval (appendStatusHandler, updatePriceHandler, persistUpdatesHandler) seront inspectant le numéro de séquence de searchTermMatchingHandler pour voir quels événements ils peuvent prendre au large de la mémoire tampon en anneau.

J'ai eu le même problème, mais c'était parce que j'instancié avec la Perturbateur en utilisant (config Java) Spring et a été instancié avec la Exécuteur dans la même méthode @Bean que le disrupteur.

I a résolu le problème en instanciant l'exécuteur dans un procédé @Bean séparé.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top