How to Split a Stream into Multiple Streams – 如何将一个流分割成多个流

最后修改: 2022年 6月 30日

1. Overview


Java’s Streams API is a powerful and versatile tool for processing data. By definition, a streaming operation is a single iteration through a set of data.


However, sometimes we want to process parts of the stream differently and get more than one set of results.


In this tutorial, we’ll learn how to split a stream into multiple groups and process them independently.


2. Using Collectors


Stream should be operated on once and have one terminal operation. It can have multiple intermediate operations, but the data can only be collected once before it closes.


This means that the Streams API specification explicitly forbids forking the stream and having different intermediate operations for each fork. This would lead to multiple terminal operations. However, we can split the stream inside the terminal operation. This creates a result divided into two or more groups.

这意味着Streams API规范明确禁止对流进行分叉,并对每个分叉进行不同的中间操作。这将导致多个终端操作。然而,我们可以在终端操作中分割流。这将创建一个分为两个或多个组的结果。

2.1. Binary Split with partitioningBy


If we want to split a stream in two, we can use partitioningBy from the Collectors class. It takes a Predicate and returns a Map that groups elements that satisfied the predicate under the Boolean true key and the rest under false.

如果我们想将一个流一分为二,我们可以使用partitioningBy,来自Collectors类。它接收一个Predicate,并返回一个Map,该Map将满足该predicate的元素分组在Boolean true键下,其余的在false下。

Let’s say we have a list of articles that contains information about the target sites they should be posted on and if they should be featured.


List<Article> articles = Lists.newArrayList(
  new Article("Baeldung", true),
  new Article("Baeldung", false),
  new Article("Programming Daily", false),
  new Article("The Code", false));

We’ll divide it into two groups, one containing only Baeldung articles and the second one containing the rest:


Map<Boolean, List<Article>> groupedArticles =
  .collect(Collectors.partitioningBy(a ->"Baeldung")));

Let’s see which articles are filed under the true and false keys in the map:


  new Article("Baeldung", true),
  new Article("Baeldung", false));
  new Article("Programming Daily", false),
  new Article("The Code", false));

2.2. Splitting with groupingBy


If we want to have more categories, then we need to use the groupingBy method. It takes a function that classifies each element into a group. Then it returns a Map that links each group classifier to a collection of its elements.


Let’s say we want to group articles by target site. The returned Map will have keys containing names of the sites and values containing collections of the articles associated with the given site:


Map<String, List<Article>> groupedArticles =
  .collect(Collectors.groupingBy(a ->;
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get("Programming Daily")).containsExactly(new Article("Programming Daily", false));
assertThat(groupedArticles.get("The Code")).containsExactly(new Article("The Code", false));

3. Using teeing


Since Java 12, we have another option for the binary split. We can use the teeing collector. teeing combines two collectors into one composite. Every element is processed by both of them and then merged into a single return value using the provided merger function.

从Java 12开始,我们有了另一种二进制分割的选择。我们可以使用teeing收集器。teeing将两个收集器合并成一个复合体。每个元素都由它们两个处理,然后使用提供的合并函数合并成一个单一的返回值。

3.1. teeing with a Predicate


The teeing collector pairs nicely with another collector from the Collectors class called filtering. It takes a predicate and uses it to filter processed elements and then passes them to yet another collector.


Let’s divide articles into groups of Baeldung and non-Baeldung ones and count them. We’ll also use the List constructor as a merger function:


List<Long> countedArticles =
  Collectors.filtering(article ->"Baeldung"), Collectors.counting()),
  Collectors.filtering(article -> !"Baeldung"), Collectors.counting()),

3.2. teeing with Overlapping Results


There is one important difference between this solution and the previous ones. The groups we created earlier had no overlap, each element from the source stream belonged to at most one group. With teeing, we are no longer bound by this limitation because each collector potentially processes the whole stream. Let’s look at how we can take advantage of it.


We may want to process articles into two groups, one with featured articles only and the second one with Baeldung articles only. The resulting sets of articles may overlap as an article can be at the same time featured and targeted at Baeldung.


This time instead of counting, we’ll collect them into lists:


List<List<Article>> groupedArticles =
  Collectors.filtering(article ->"Baeldung"), Collectors.toList()),
  Collectors.filtering(article -> article.featured, Collectors.toList()),


  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get(1)).containsExactly(new Article("Baeldung", true));

4. Using RxJava


While Java’s Streams API is a useful tool, sometimes it’s not enough. Other solutions, like reactive streams provided by RxJava, may be able to help us. Let’s look at a short example of how we can use an Observable and multiple Subscribers to achieve the same results as our Stream examples.

虽然 Java 的 Streams API 是一个有用的工具,但有时它是不够的。其他解决方案,如RxJava提供的反应式流,或许能够帮助我们。让我们看一个简短的例子,看看我们如何使用一个Observable和多个Subscribers来实现与我们的Stream例子相同的结果。

4.1. Creating an Observable


First, we need to create an Observable instance from our list of articles. We can use the Observable class’s from factory method:


Observable<Article> observableArticles = Observable.from(articles);

4.2. Filtering Observables


Next, we need to create Observables that will filter articles. To do that, we’ll use the filter method from the Observable class:


Observable<Article> baeldungObservable = observableArticles.filter(
  article ->"Baeldung"));
Observable<Article> featuredObservable = observableArticles.filter(
  article -> article.featured);

4.3. Creating Multiple Subscribers


Finally, we need to subscribe to the Observables and provide an Action that will describe what we want to do with the articles. A real-world example would be saving them in the database or sending them to the client, but we’ll settle for adding them to the list:


List<Article> baeldungArticles = new ArrayList<>();
List<Article> featuredArticles = new ArrayList<>();

5. Conclusion


In this tutorial, we learned how to split streams into groups and process them separately. First, we looked at the older Streams API methods: groupingBy and partitionBy. Next, we used a newer approach utilizing the teeing method introduced in Java 12. Finally, we looked at how we can use RxJava to achieve similar results with greater elasticity.

在本教程中,我们学习了如何将流分成组并分别处理。首先,我们看了老的Streams API方法。groupingBypartitionBy。接下来,我们利用Java 12中引入的teeing方法,使用了一种较新的方法。最后,我们研究了如何利用RxJava以更大的弹性来实现类似的结果。

As always, the source code is available over on GitHub.