programing

플럭스를 올바르게 읽고 단일 입력 스트림으로 변환하는 방법

linuxpc 2023. 8. 29. 20:12
반응형

플럭스를 올바르게 읽고 단일 입력 스트림으로 변환하는 방법

사용 중WebClient그리고 관습BodyExtractor스프링 부트 애플리케이션을 위한 수업

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

위의 코드는 작은 페이로드에서는 작동하지만 큰 페이로드에서는 작동하지 않습니다. 그것은 내가 단지 하나의 플럭스 값을 읽기 때문이라고 생각합니다.next그리고 나는 어떻게 결합하고 모든 것을 읽는지 확신할 수 없습니다.dataBuffer.

저는 원자로를 처음 사용해서 플럭스/모노를 사용하는 요령을 잘 모릅니다.

이것은 다른 대답들이 암시하는 것만큼 복잡하지 않습니다.

데이터를 메모리에 모두 버퍼링하지 않고 스트리밍할 수 있는 유일한 방법은 @jin-kwon이 제안한 대로 파이프를 사용하는 것입니다.그러나 Spring의 BodyExtractorsDataBufferUtils 유틸리티 클래스를 사용하면 매우 간단하게 수행할 수 있습니다.

예:

private InputStream readAsInputStream(String url) throws IOException {
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    ClientResponse response = webClient.get().uri(url)
        .accept(MediaType.APPLICATION.XML)
        .exchange()
        .block();
    final int statusCode = response.rawStatusCode();
    // check HTTP status code, can throw exception if needed
    // ....

    Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
        .doOnError(t -> {
            log.error("Error reading body.", t);
            // close pipe to force InputStream to error,
            // otherwise the returned InputStream will hang forever if an error occurs
            try(isPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        })
        .doFinally(s -> {
            try(osPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        });

    DataBufferUtils.write(body, osPipe)
        .subscribe(DataBufferUtils.releaseConsumer());

    return isPipe;
}

응답 코드를 확인하거나 오류 상태 코드에 대한 예외를 적용하는 데 관심이 없는 경우 다음을 건너뛸 수 있습니다.block()콜 앤드 인터프리터ClientResponse사용에 의한 변수

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))

대신.

Bk Santiago의 답변을 약간 수정한 버전은 다음을 사용합니다.reduce()대신에collect()매우 유사하지만 추가 수업이 필요하지 않습니다.

Java:

body.reduce(new InputStream() {
    public int read() { return -1; }
  }, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */

또는 코틀린:

body.reduce(object : InputStream() {
  override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
  .flatMap { inputStream -> /* do something with single InputStream */ }

사용에 비해 이 접근 방식의 이점collect()단순히 여러분이 다른 수업을 들을 필요가 없다는 것입니다.

새 빈칸을 만들었습니다.InputStream()하지만 만약 그 구문이 혼란스럽다면, 당신은 또한 그것을 대체할 수 있습니다.ByteArrayInputStream("".toByteArray())대신 빈 항목을 만듭니다.ByteArrayInputStream대신 초기 값으로 사용할 수 있습니다.

여기 다른 답들과 다른 변형이 있습니다.그리고 그것은 여전히 기억하기에 적합하지 않습니다.

static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
    return response.bodyToFlux(DataBuffer.class)
        .map(b -> b.asInputStream(true))
        .reduce(SequenceInputStream::new);
}

static void doSome(WebClient.ResponseSpec response) {
    asStream(response)
        .doOnNext(stream -> {
            // do some with stream
            // close the stream!!!
        })
        .block();
}

를 사용하여 작동할 수 있었습니다.Flux#collect그리고.SequenceInputStream

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
    .map(inputStream -> {
      try {
        JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
        Unmarshaller unmarshaller = jc.createUnmarshaller();

        return (T) unmarshaller.unmarshal(inputStream);
      } catch(Exception e){
        return null;
      }
  }).next();
}

InputStreamCollector.java

public class InputStreamCollector {
  private InputStream is;

  public void collectInputStream(InputStream is) {
    if (this.is == null) this.is = is;
    this.is = new SequenceInputStream(this.is, is);
  }

  public InputStream getInputStream() {
    return this.is;
  }
}

기본 원자로 망을 사용하여 훨씬 더 깨끗한 방법이 있습니다.HttpClient사용하는 대신 직접WebClient구성 계층은 다음과 같습니다.

WebClient -uses-> HttpClient -uses-> TcpClient

코드를 표시하는 것이 설명보다 쉽습니다.

HttpClient.create()
    .get()
    .responseContent() // ByteBufFlux
    .aggregate() // ByteBufMono
    .asInputStream() // Mono<InputStream>
    .block() // We got an InputStream, yay!

하지만, 이미 지적했듯이, 사용하는 것입니다.InputStream전체 응답을 집계하는 것은 말할 것도 없고 비차단 HTTP 클라이언트를 사용하는 목적도 무시하는 차단 작업입니다.Java NIO와 IO를 비교하려면 이 항목을 참조하십시오.

파이프를 사용할 수 있습니다.

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source, final Executor executor,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(Pipe::open,
                 p -> {
                     executor.execute(() -> write(source, p.sink())
                             .doFinally(s -> {
                                 try {
                                     p.sink().close();
                                 } catch (final IOException ioe) {
                                     log.error("failed to close pipe.sink", ioe);
                                     throw new RuntimeException(ioe);
                                 }
                             })
                             .subscribe(releaseConsumer()));
                     return just(function.apply(p.source()));
                 },
                 p -> {
                     try {
                         p.source().close();
                     } catch (final IOException ioe) {
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);
                     }
                 });
}

또는 사용CompletableFuture,

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(Pipe::open,
                 p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
                         .doFirst(() -> write(source, p.sink())
                                 .doFinally(s -> {
                                     try {
                                         p.sink().close();
                                     } catch (final IOException ioe) {
                                         log.error("failed to close pipe.sink", ioe);
                                         throw new RuntimeException(ioe);
                                     }
                                 })
                                 .subscribe(releaseConsumer())),
                 p -> {
                     try {
                         p.source().close();
                     } catch (final IOException ioe) {
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);
                     }
                 });
}

언급URL : https://stackoverflow.com/questions/46460599/how-to-correctly-read-fluxdatabuffer-and-convert-it-to-a-single-inputstream

반응형