I am trying to zip to observables using Vert.x and RxJava. I don't know if I am misunderstanding something or this is just some kind of bug. Here is the code.

public class BusVerticle extends Verticle {

    public void start() {

        final RxVertx rxVertx = new RxVertx(vertx);

        Observable<RxMessage<JsonObject>> bus = rxVertx.eventBus().registerHandler("busName");

        Observable<RxHttpClientResponse> httpResponse = bus.mapMany(new Func1<RxMessage<JsonObject>, Observable<RxHttpClientResponse>>() {
            public Observable<RxHttpClientResponse> call(RxMessage<JsonObject> rxMessage) {
                RxHttpClient rxHttpClient = rxVertx.createHttpClient();
                rxHttpClient.coreHttpClient().setHost("localhost").setPort(80);
                return rxHttpClient.getNow("/uri");
            }
        });

        Observable<RxMessage<JsonObject>> zipObservable = Observable.zip(bus, httpResponse, new Func2<RxMessage<JsonObject>, RxHttpClientResponse, RxMessage<JsonObject>>() {
            public RxMessage<JsonObject> call(RxMessage<JsonObject> rxMessage, RxHttpClientResponse rxHttpClientResponse) {
                return rxMessage;
            }
        });

        zipObservable.subscribe(new Action1<RxMessage<JsonObject>>() {
            public void call(RxMessage<JsonObject> rxMessage) {
                rxMessage.reply();
            }
        });
    }
}

I want to make an HTTP request using information from the received message and then zip both observables, the event bus and the HTTP response, in order to reply to the message with information from the HTTP response.

I am not getting any response for the message where I am sending it.

Thanks in advance!

有帮助吗?

解决方案

I have solved it with a workaround. Some kind of mixed solution.

public class BusVerticle extends Verticle {

public void start() {
    final RxVertx rxVertx = new RxVertx(vertx);

    vertx.eventBus().registerHandler("busName", new Handler<Message<JsonObject>>() {
        public void handle(final Message<JsonObject> message) {
            RxHttpClient rxHttpClient = rxVertx.createHttpClient();
            rxHttpClient.coreHttpClient().setHost("localhost").setPort(80);
            Observable<RxHttpClientResponse> httpRequest = rxHttpClient.getNow("/uri");
            httpRequest.subscribe(new Action1<RxHttpClientResponse>() {
                public void call(RxHttpClientResponse response) {
                    container.logger().error(response.statusCode());
                    message.reply(new JsonObject().putString("status", "ok"));
                }
            });
        }
    });
}

}

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top