플럭스 를 올바르게 읽고 단일 입력 스트림으로 변환 하는 방법
사용 중WebClient
그리고 관습BodyExtractor
스프링 부트 애플리케이션을 위한 수업
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response -> {
return response.body(new BodyExtractor());
})
BodyExtractor.java
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
} catch(Exception e){
return null;
}
}).next();
}
위의 코드는 작은 페이로드에서는 작동하지만 큰 페이로드에서는 작동하지 않습니다. 그것은 내가 단지 하나의 플럭스 값을 읽기 때문이라고 생각합니다.next
그리고 나는 어떻게 결합하고 모든 것을 읽는지 확신할 수 없습니다.dataBuffer
.
저는 원자로를 처음 사용해서 플럭스/모노를 사용하는 요령을 잘 모릅니다.
이것은 다른 대답들이 암시하는 것만큼 복잡하지 않습니다.
데이터를 메모리에 모두 버퍼링하지 않고 스트리밍할 수 있는 유일한 방법은 @jin-kwon이 제안한 대로 파이프를 사용하는 것입니다.그러나 Spring의 BodyExtractors 및 DataBufferUtils 유틸리티 클래스를 사용하면 매우 간단하게 수행할 수 있습니다.
예:
private InputStream readAsInputStream(String url) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputStream isPipe = new PipedInputStream(osPipe);
ClientResponse response = webClient.get().uri(url)
.accept(MediaType.APPLICATION.XML)
.exchange()
.block();
final int statusCode = response.rawStatusCode();
// check HTTP status code, can throw exception if needed
// ....
Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
.doOnError(t -> {
log.error("Error reading body.", t);
// close pipe to force InputStream to error,
// otherwise the returned InputStream will hang forever if an error occurs
try(isPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
})
.doFinally(s -> {
try(osPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
});
DataBufferUtils.write(body, osPipe)
.subscribe(DataBufferUtils.releaseConsumer());
return isPipe;
}
응답 코드를 확인하거나 오류 상태 코드에 대한 예외를 적용하는 데 관심이 없는 경우 다음을 건너뛸 수 있습니다.block()
콜 앤드 인터프리터ClientResponse
사용에 의한 변수
flatMap(r -> r.body(BodyExtractors.toDataBuffers()))
대신.
Bk Santiago의 답변을 약간 수정한 버전은 다음을 사용합니다.reduce()
대신에collect()
매우 유사하지만 추가 수업이 필요하지 않습니다.
Java:
body.reduce(new InputStream() {
public int read() { return -1; }
}, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */
또는 코틀린:
body.reduce(object : InputStream() {
override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
.flatMap { inputStream -> /* do something with single InputStream */ }
사용에 비해 이 접근 방식의 이점collect()
단순히 여러분이 다른 수업을 들을 필요가 없다는 것입니다.
새 빈칸을 만들었습니다.InputStream()
하지만 만약 그 구문이 혼란스럽다면, 당신은 또한 그것을 대체할 수 있습니다.ByteArrayInputStream("".toByteArray())
대신 빈 항목을 만듭니다.ByteArrayInputStream
대신 초기 값으로 사용할 수 있습니다.
여기 다른 답들과 다른 변형이 있습니다.그리고 그것은 여전히 기억하기에 적합하지 않습니다.
static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
return response.bodyToFlux(DataBuffer.class)
.map(b -> b.asInputStream(true))
.reduce(SequenceInputStream::new);
}
static void doSome(WebClient.ResponseSpec response) {
asStream(response)
.doOnNext(stream -> {
// do some with stream
// close the stream!!!
})
.block();
}
를 사용하여 작동할 수 있었습니다.Flux#collect
그리고.SequenceInputStream
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
.map(inputStream -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(inputStream);
} catch(Exception e){
return null;
}
}).next();
}
InputStreamCollector.java
public class InputStreamCollector {
private InputStream is;
public void collectInputStream(InputStream is) {
if (this.is == null) this.is = is;
this.is = new SequenceInputStream(this.is, is);
}
public InputStream getInputStream() {
return this.is;
}
}
기본 원자로 망을 사용하여 훨씬 더 깨끗한 방법이 있습니다.HttpClient
사용하는 대신 직접WebClient
구성 계층은 다음과 같습니다.
WebClient -uses-> HttpClient -uses-> TcpClient
코드를 표시하는 것이 설명보다 쉽습니다.
HttpClient.create()
.get()
.responseContent() // ByteBufFlux
.aggregate() // ByteBufMono
.asInputStream() // Mono<InputStream>
.block() // We got an InputStream, yay!
하지만, 이미 지적했듯이, 사용하는 것입니다.InputStream
전체 응답을 집계하는 것은 말할 것도 없고 비차단 HTTP 클라이언트를 사용하는 목적도 무시하는 차단 작업입니다.Java NIO와 IO를 비교하려면 이 항목을 참조하십시오.
파이프를 사용할 수 있습니다.
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source, final Executor executor,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> {
executor.execute(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer()));
return just(function.apply(p.source()));
},
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
또는 사용CompletableFuture
,
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
.doFirst(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer())),
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
언급URL : https://stackoverflow.com/questions/46460599/how-to-correctly-read-fluxdatabuffer-and-convert-it-to-a-single-inputstream
'programing' 카테고리의 다른 글
일반 오류 해결 방법: 2006 MySQL 서버가 사라졌습니다. (0) | 2023.08.29 |
---|---|
제거/무시 방법: 터치 장치에서 css 스타일을 호버합니다. (0) | 2023.08.29 |
안드로이드에서 현재 전경 활동 컨텍스트를 얻는 방법은 무엇입니까? (0) | 2023.08.29 |
날짜 범위 사이의 데이터 찾기 (0) | 2023.08.29 |
SQL 예외: 오라클의 프로토콜 위반 (0) | 2023.08.29 |