Reading Flux Into a Single InputStream Using Spring Reactive WebClient – 使用Spring Reactive WebClient将Flux读入一个单一的InputStream中

最后修改: 2022年 7月 27日

1. Overview


In this tutorial, we’ll deep dive into Java reactive programming to solve an interesting problem of how to read Flux<DataBuffer> into a single InputStream.


2. Request Setup


As a first step to solving the problem of reading Flux<DataBuffer> into a single InputStream, we’ll use the Spring reactive WebClient for making a GET request. Further, we can use one of the public API endpoints hosted by for such testing scenarios:

作为解决将Flux<DataBuffer>读入单个InputStream问题的第一步,我们将使用Spring reactive WebClient来进行GET请求。此外,我们可以使用托管的公共API端点之一来进行此类测试场景。


Next, let’s define the getWebClient() method for getting a new instance of the WebClient class:


static WebClient getWebClient() {
    WebClient.Builder webClientBuilder = WebClient.builder();

At this point, we’re ready to make a GET request to the /public/v2/users endpoint. However, we must get the response body as a Flux<DataBuffer> object. So, let’s move on to the next section about BodyExtractors to do precisely this.


3. BodyExtractors and DataBufferUtils


We can use the toDataBuffers() method of the BodyExtractors class available in spring-webflux to extract the response body into Flux<DataBuffer>.


Let’s go ahead and create body as an instance of Flux<DataBuffer> type:


Flux<DataBuffer> body = client
    .exchangeToFlux( clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());

Next, as we require to collect these streams of DataBuffer into a single InputStream, a good strategy to achieve this is by using PipedInputStream and PipedOutputStream.


Further, we intend to write to the PipedOutputStream and eventually read from the PipedInputStream. So, let’s see how we can create these two connected streams:


PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);

We must note that the default size is 1024 bytes. However, we expect that the collected result from the Flux<DataBuffer> could exceed the default value. Therefore, we need to explicitly specify a larger value for the size, which in this case is 1024*10.


Finally, we use the write() utility method available in the DataBufferUtils class for writing body as a publisher to outputStream:


DataBufferUtils.write(body, outputStream).subscribe();

We must note that we connected inputStream to outputStream at the time of declaration. So, we’re good to read from inputStream. Let’s move on to the next section to see this in action.


4. Reading From the PipedInputStream


First, let’s defined a helper method, readContent(), to read an InputStream as a String object:


String readContent(InputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    byte[] tmp = new byte[stream.available()];
    int byteCount =, 0, tmp.length);
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);

Next, because it’s a typical practice to read the PipedInputStream in a different thread, let’s create the readContentFromPipedInputStream() method  that internally spawns a new thread to read contents from the PipedInputStream into a String object by calling the readContent() method:

接下来,因为它是一个典型的 在不同的线程中读取PipedInputStream的做法。让我们创建readContentFromPipedInputStream()方法,在内部生成一个新的线程,通过调用PipedInputStream方法将内容从String对象中读取。

String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try {
        Thread pipeReader = new Thread(() -> {
            try {
            } catch (IOException e) {
                throw new RuntimeException(e);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {

    return String.valueOf(contentStringBuffer);

At this stage, our code is ready to use for a simulation. Let’s see it in action:


WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);"response content: \n{}", content.replace("}","}\n"));

As we’re dealing with an asynchronous system, we’re delaying the read by an arbitrary 3 secs before reading from the stream so that we’re able to see the complete response. Additionally, at the time of logging, we’re inserting a newline character to break the long output to multiple lines.


Finally, let’s verify the output generated by the code execution:


20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"","gender":"male","status":"inactive"}
,{"id":2623,"name":"Mohini Mishra","email":"","gender":"female","status":"inactive"}

That’s it! It looks like we’ve got it all right.


5. Conclusion


In this article, we used the concept of piped streams and the utility methods available in the BodyExtractors and DataBufferUtils classes to read Flux<DataBuffer> into a single InputStream.


As always, the complete source code for the tutorial is available over on GitHub.