admin管理员组

文章数量:1428160

I am trying to use RecordParser for large response from WebClient.

Vert.x documentation says:

When large response are expected, use the BodyCodec.pipe. This body codec pumps the response body buffers to a WriteStream and signals the success or the failure of the operation in the async result response

But I don't see an easy way to pass that WriteStream to RecordParser. I have used simplified code pasted below which works but implementing such is a source of potential bugs as async protocols are easier to mess up. Does vert.x offers out of box such integration.

RecordParser parser = RecordParser.newDelimited("\n", b -> log.info("r={}", b.toString()));
RecordParserWriteStream bridge = new RecordParserWriteStream(parser);
client
    .get(sut.actualPort(), "localhost", "/stream?file=stream2.txt")
    .as(BodyCodec.pipe(bridge))
    .send(
        ar -> {
          if (ar.succeeded()) {
            ctxpleteNow();
          } else {
            ctx.failNow(ar.cause());
          }
        });


@Slf4j
@RequiredArgsConstructor
public class RecordParserWriteStream implements WriteStream<Buffer> {

    private final RecordParser recordParser;

    @Override
    public WriteStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
        recordParser.exceptionHandler(handler);
        return this;
    }

    @Override
    public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
        log.info("write {}", data.length());
        recordParser.handle(data);
        Promise<Void> promise = Promise.promise();
        promiseplete();
        handler.handle(promise.future());
    }

    @Override
    public void end(Handler<AsyncResult<Void>> handler) {
        Promise<Void> promise = Promise.promise();
        promiseplete();
        handler.handle(promise.future());
    }

    @Override
    public boolean writeQueueFull() {
        return false;
    }

    @Override
    public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
        return this;
    }

    @Override
    public Future<Void> write(Buffer data) {
        throw new UnsupportedOperationException();
    }

    @Override
    public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
        throw new UnsupportedOperationException();
    }
}

I found few older SO answers where it was suggested to use HttpClient but official documentation still recommends WebClient with BodyCodec.pipe/WriteStream.

With using HttpClient it looks as

RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
    .request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
    pose(HttpClientRequest::send)
    .onComplete(
        ar -> {
          if (ar.succeeded()) {
            HttpClientResponse response = ar.result();
            response.handler(parser);
            response.endHandler(e -> ctxpleteNow());
          } else {
            ctx.failNow(ar.cause());
          }
        });

What is a best way to call HTTP service and pass the response to RecordParser?

I am trying to use RecordParser for large response from WebClient.

Vert.x documentation says:

When large response are expected, use the BodyCodec.pipe. This body codec pumps the response body buffers to a WriteStream and signals the success or the failure of the operation in the async result response

But I don't see an easy way to pass that WriteStream to RecordParser. I have used simplified code pasted below which works but implementing such is a source of potential bugs as async protocols are easier to mess up. Does vert.x offers out of box such integration.

RecordParser parser = RecordParser.newDelimited("\n", b -> log.info("r={}", b.toString()));
RecordParserWriteStream bridge = new RecordParserWriteStream(parser);
client
    .get(sut.actualPort(), "localhost", "/stream?file=stream2.txt")
    .as(BodyCodec.pipe(bridge))
    .send(
        ar -> {
          if (ar.succeeded()) {
            ctxpleteNow();
          } else {
            ctx.failNow(ar.cause());
          }
        });


@Slf4j
@RequiredArgsConstructor
public class RecordParserWriteStream implements WriteStream<Buffer> {

    private final RecordParser recordParser;

    @Override
    public WriteStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
        recordParser.exceptionHandler(handler);
        return this;
    }

    @Override
    public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
        log.info("write {}", data.length());
        recordParser.handle(data);
        Promise<Void> promise = Promise.promise();
        promiseplete();
        handler.handle(promise.future());
    }

    @Override
    public void end(Handler<AsyncResult<Void>> handler) {
        Promise<Void> promise = Promise.promise();
        promiseplete();
        handler.handle(promise.future());
    }

    @Override
    public boolean writeQueueFull() {
        return false;
    }

    @Override
    public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
        return this;
    }

    @Override
    public Future<Void> write(Buffer data) {
        throw new UnsupportedOperationException();
    }

    @Override
    public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
        throw new UnsupportedOperationException();
    }
}

I found few older SO answers where it was suggested to use HttpClient but official documentation still recommends WebClient with BodyCodec.pipe/WriteStream.

With using HttpClient it looks as

RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
    .request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
    pose(HttpClientRequest::send)
    .onComplete(
        ar -> {
          if (ar.succeeded()) {
            HttpClientResponse response = ar.result();
            response.handler(parser);
            response.endHandler(e -> ctxpleteNow());
          } else {
            ctx.failNow(ar.cause());
          }
        });

What is a best way to call HTTP service and pass the response to RecordParser?

Share Improve this question asked Nov 28, 2024 at 10:19 kodstarkkodstark 4924 silver badges12 bronze badges 2
  • This question is similar to: Vert.x httpClient/webClient process response chunk by chunk or as stream. If you believe it’s different, please edit the question, make it clear how it’s different and/or how the answers on that question are not helpful for your problem. – tsegismont Commented Dec 12, 2024 at 11:43
  • @tsegismont I found that question before posting this one (even asked a question there). "you can do" mindset sometime relies on internal contracts and from someone from the outside it is not obvious if that's comes with hidden bugs. Thus recommendation from the Vert.x team "do ... to connect RecordParser with HTTP streaming" is highly appreciated. Also that SO answer is 4 years old and latest doc suggested me to use BodyCodec.pipe. I understand that my second solution is now a preferred for this problem. Thank you for great framework! – kodstark Commented Mar 19 at 14:20
Add a comment  | 

1 Answer 1

Reset to default 0

@tsegismont posted a comment that a answer for Vert.x httpClient/webClient process response chunk by chunk or as stream is still up to date and HttpClient should be used when HTTP streaming must be connected with RecordParser. It means second solution from the question is preferred:

RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
    .request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
    pose(HttpClientRequest::send)
    .onComplete(
        ar -> {
          if (ar.succeeded()) {
            HttpClientResponse response = ar.result();
            response.handler(parser);
            response.endHandler(e -> ctxpleteNow());
          } else {
            ctx.failNow(ar.cause());
          }
        });

Ideally PR for Vert.x documentation should clarify it.

本文标签: Use vertx RecordParser with WebClient streamingStack Overflow