Guide to Akka Streams – Akka流指南

最后修改: 2017年 6月 5日

1. Overview


In this article, we will be looking at the akka-streams library that is built atop of the Akka actor framework, which adheres to the reactive streams manifesto. The Akka Streams API allows us to easily compose data transformation flows from independent steps.

在本文中,我们将关注akka-streams库,该库建立在Akka actor框架之上,遵守reactive streams宣言Akka Streams API使我们能够轻松地从独立的步骤中组成数据转换流。

Moreover, all processing is done in a reactive, non-blocking, and asynchronous way.


2. Maven Dependencies


To get started, we need to add the akka-stream and akka-stream-testkit libraries into our pom.xml:



3. Akka Streams API

3.Akka Streams API

To work with Akka Streams, we need to be aware of the core API concepts:

为了使用Akka Streams,我们需要了解核心的API概念。

  • Sourcethe entry point to processing in the akka-stream library – we can create an instance of this class from multiple sources; for example, we can use the single() method if we want to create a Source from a single String, or we can create a Source from an Iterable of elements
  • Flow – the main processing building block – every Flow instance has one input and one output value
  • Materializer – we can use one if we want our Flow to have some side effects like logging or saving results; most commonly, we will be passing the NotUsed alias as a Materializer to denote that our Flow should not have any side effects
  • Sink operation – when we are building a Flow, it is not executed until we will register a Sink operation on it – it is a terminal operation that triggers all computations in the entire Flow

4. Creating Flows in Akka Streams


Let’s start by building a simple example, where we’ll show how to create and combine multiple Flows – to process a stream of integers and calculate the average moving window of integer pairs from the stream.


We’ll parse a semicolon-delimited String of integers as input to create our akka-stream Source for the example.

我们将解析一个以分号分隔的String的整数作为输入,以创建我们的akka-stream Source的例子。

4.1. Using a Flow to Parse Input


First, let’s create a DataImporter class that will take an instance of the ActorSystem that we will use later to create our Flow:


public class DataImporter {
    private ActorSystem actorSystem;

    // standard constructors, getters...

Next, let’s create a parseLine method that will generate a List of Integer from our delimited input String. Keep in mind that we are using Java Stream API here only for parsing:

接下来,让我们创建一个parseLine方法,它将从我们划定的输入String中生成List of Integer请记住,我们在这里只使用Java Stream API来进行解析。

private List<Integer> parseLine(String line) {
    String[] fields = line.split(";");

Our initial Flow will apply parseLine to our input to create a Flow with input type String and output type Integer:


private Flow<String, Integer, NotUsed> parseContent() {
    return Flow.of(String.class)

When we call the parseLine() method, the compiler knows that the argument to that lambda function will be a String – same as the input type to our Flow.


Note that we are using the mapConcat() method – equivalent to the Java 8 flatMap() method – because we want to flatten the List of Integer returned by parseLine() into a Flow of Integer so that subsequent steps in our processing do not need to deal with the List.

请注意,我们使用的是mapConcat() 方法–相当于Java 8的flatMap() 方法–因为我们想把parseLine()返回的ListInteger平铺到一个FlowInteger中,这样我们处理的后续步骤就不需要处理这个List

4.2. Using a Flow to Perform Calculations


At this point, we have our Flow of parsed integers. Now, we need to implement logic that will group all input elements into pairs and calculate an average of those pairs.


Now, we’ll create a Flow of Integers and group them using the grouped() method.

现在,我们将创建一个Flow Integers,并使用grouped() method将它们分组。

Next, we want to calculate an average.


Since we are not interested in the order in which those averages will be processed, we can have averages calculated in parallel using multiple threads by using the mapAsyncUnordered() method, passing the number of threads as an argument to this method.

由于我们对这些平均数的处理顺序不感兴趣,我们可以通过使用mapAsyncUnordered() method,将线程数作为参数传递给该方法,让平均数使用多个线程并行计算。

The action that will be passed as the lambda to the Flow needs to return a CompletableFuture because that action will be calculated asynchronously in the separate thread:


private Flow<Integer, Double, NotUsed> computeAverage() {
    return Flow.of(Integer.class)
      .mapAsyncUnordered(8, integers ->
        CompletableFuture.supplyAsync(() ->
          .mapToDouble(v -> v)

We are calculating averages in eight parallel threads. Note that we are using the Java 8 Stream API for calculating an average.

我们在八个并行线程中计算平均数。请注意,我们使用的是Java 8 Stream API来计算平均数。

4.3. Composing Multiple Flows into a Single Flow


The Flow API is a fluent abstraction that allows us to compose multiple Flow instances to achieve our final processing goal. We can have granular flows where one, for example, is parsing JSON, another is doing some transformation, and another is gathering some statistics.

Flow API是一个流畅的抽象,它允许我们组合多个Flow实例来实现我们的最终处理目标。我们可以有细化的流程,例如,一个是解析JSON,另一个是做一些转换,还有一个是收集一些统计数据。

Such granularity will help us create more testable code because we can test each processing step independently.


We created two flows above that can work independently of each other. Now, we want to compose them together.


First, we want to parse our input String, and next, we want to calculate an average on a stream of elements.


We can compose our flows using the via() method:


Flow<String, Double, NotUsed> calculateAverage() {
    return Flow.of(String.class)

We created a Flow having input type String and two other flows after it. The parseContent() Flow takes a String input and returns an Integer as output. The computeAverage() Flow is taking that Integer and calculates an average returning Double as the output type.

我们创建了一个输入类型为StringFlow,并在它之后创建了另外两个flow。parseContent() Flow接收一个String输入并返回一个Integer作为输出。computeAverage() Flow接收该Integer并计算一个平均值,返回Double作为输出类型。

5. Adding Sink to the Flow


As we mentioned, to this point the whole Flow is not yet executed because it is lazy. To start execution of the Flow we need to define a Sink. The Sink operation can, for example, save data into a database, or send results to some external web service.


Suppose we have an AverageRepository class with the following save() method that writes results to our database:


CompletionStage<Double> save(Double average) {
    return CompletableFuture.supplyAsync(() -> {
        // write to database
        return average;

Now, we want to create a Sink operation that use this method to save the results of our Flow processing. To create our Sink, we first need to create a Flow that takes a result of our processing as the input type. Next, we want to save all our results to the database.


Again, we do not care about ordering of the elements, so we can perform the save() operations in parallel using the mapAsyncUnordered() method.

同样,我们不关心元素的排序,所以我们可以使用mapAsyncUnordered() 方法并行执行save()操作。

To create a Sink from the Flow we need to call the toMat() with Sink.ignore() as a first argument and Keep.right() as the second because we want to return a status of the processing:


private Sink<Double, CompletionStage<Done>> storeAverages() {
    return Flow.of(Double.class)
      .mapAsyncUnordered(4, averageRepository::save)
      .toMat(Sink.ignore(), Keep.right());

6. Defining a Source for Flow


The last thing that we need to do is to create a Source from the input String. We can apply a calculateAverage() Flow to this source using the via() method.

我们需要做的最后一件事是从输入的字符串创建一个Source我们可以使用via()方法将calculateAverage() Flow应用于这个源。

Then, to add the Sink to the processing, we need to call the runWith() method and pass the storeAverages() Sink that we just created:


CompletionStage<Done> calculateAverageForContent(String content) {
    return Source.single(content)
      .runWith(storeAverages(), ActorMaterializer.create(actorSystem))
      .whenComplete((d, e) -> {
          if (d != null) {
              System.out.println("Import finished ");
          } else {

Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.


7. Testing Akka Streams


We can test our processing using the akka-stream-testkit.


The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.

测试实际处理逻辑的最佳方式是测试所有的Flow 逻辑,并使用TestSink 来触发计算并对结果进行断言。

In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:


public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
    // given
    Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
    String input = "1;9;11;0";

    // when
    Source<Double, NotUsed> flow = Source.single(input).via(tested);

    // then
      .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
      .expectNextUnordered(5d, 5.5);

We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.


8. Conclusion


In this article, we were looking at the akka-stream library.


We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.


Finally, we wrote a test for our processing using the akka-stream-testkit.


The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.