Question

I am trying to zip to observables using Vert.x and RxJava. I don't know if I am misunderstanding something or this is just some kind of bug. Here is the code.

public class BusVerticle extends Verticle {

    public void start() {

        final RxVertx rxVertx = new RxVertx(vertx);

        Observable<RxMessage<JsonObject>> bus = rxVertx.eventBus().registerHandler("busName");

        Observable<RxHttpClientResponse> httpResponse = bus.mapMany(new Func1<RxMessage<JsonObject>, Observable<RxHttpClientResponse>>() {
            public Observable<RxHttpClientResponse> call(RxMessage<JsonObject> rxMessage) {
                RxHttpClient rxHttpClient = rxVertx.createHttpClient();
                rxHttpClient.coreHttpClient().setHost("localhost").setPort(80);
                return rxHttpClient.getNow("/uri");
            }
        });

        Observable<RxMessage<JsonObject>> zipObservable = Observable.zip(bus, httpResponse, new Func2<RxMessage<JsonObject>, RxHttpClientResponse, RxMessage<JsonObject>>() {
            public RxMessage<JsonObject> call(RxMessage<JsonObject> rxMessage, RxHttpClientResponse rxHttpClientResponse) {
                return rxMessage;
            }
        });

        zipObservable.subscribe(new Action1<RxMessage<JsonObject>>() {
            public void call(RxMessage<JsonObject> rxMessage) {
                rxMessage.reply();
            }
        });
    }
}

I want to make an HTTP request using information from the received message and then zip both observables, the event bus and the HTTP response, in order to reply to the message with information from the HTTP response.

I am not getting any response for the message where I am sending it.

Thanks in advance!

Was it helpful?

Solution

I have solved it with a workaround. Some kind of mixed solution.

public class BusVerticle extends Verticle {

public void start() {
    final RxVertx rxVertx = new RxVertx(vertx);

    vertx.eventBus().registerHandler("busName", new Handler<Message<JsonObject>>() {
        public void handle(final Message<JsonObject> message) {
            RxHttpClient rxHttpClient = rxVertx.createHttpClient();
            rxHttpClient.coreHttpClient().setHost("localhost").setPort(80);
            Observable<RxHttpClientResponse> httpRequest = rxHttpClient.getNow("/uri");
            httpRequest.subscribe(new Action1<RxHttpClientResponse>() {
                public void call(RxHttpClientResponse response) {
                    container.logger().error(response.statusCode());
                    message.reply(new JsonObject().putString("status", "ok"));
                }
            });
        }
    });
}

}

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top