SEDA With Spring Integration and Apache Camel – 带有Spring集成和Apache Camel的SEDA

最后修改: 2022年 9月 28日

1. Introduction


SEDA, or Staged Event-Driven Architecture, is an architectural style proposed by Matt Welsh in his Ph.D. thesis. Its primary benefits are scalability, support for highly-concurrent traffic, and maintainability.

SEDA,即阶段性事件驱动架构,是Matt Welsh在其博士论文中提出的一种架构风格。它的主要优点是可扩展性、对高并发流量的支持以及可维护性。

In this tutorial, we’ll use SEDA to count the unique words in a sentence using two separate implementations: Spring Integration and Apache Camel.

在本教程中,我们将使用SEDA来计算一个句子中的唯一单词,使用两个独立的实现。Spring IntegrationApache Camel



SEDA addresses several non-functional requirements specific to online services:


  1. High concurrency: The architecture must support as many concurrent requests as possible.
  2. Dynamic content: Software systems must often support complex business use cases, requiring many steps to process user requests and generate responses.
  3. Robustness to load: User traffic for online services can be unpredictable, and the architecture needs to deal with changes in traffic volume gracefully.

To address these requirements, SEDA decomposes complex services into event-driven stages. These stages are indirectly connected with queues and can thus be completely decoupled from each other. Furthermore, each stage has a scaling mechanism to cope with its incoming load:


SEDA Overview

The above diagram from Matt Welsh’s paper depicts the overall structure of a web server implemented with SEDA. Each rectangle represents a single processing stage for an incoming HTTP request. The stages can independently consume tasks from their incoming queues, do some processing or I/O work, and then pass a message to the next queue.

上图来自Matt Welsh的论文,描述了一个用SEDA实现的网络服务器的整体结构。每个矩形代表了一个传入HTTP请求的单一处理阶段。这些阶段可以独立地从其传入的队列中消耗任务,做一些处理或I/O工作,然后将消息传递给下一个队列。

2.1. Components


To better understand the components of SEDA, let’s look at how this diagram from Matt Welsh’s thesis shows the inner workings of a single stage:


SEDA Stage

As we can see, each SEDA stage has the following components:


  • Event: Events are data structures containing whatever data the stage needs to perform its processing. For example, for an HTTP web server, events might contain user data – such as the body, header, and request parameters – and infrastructure data like the user’s IP, the request timestamp, etc.
  • Event Queue: This holds the stage’s incoming events.
  • Event Handler: The event handler is the procedural logic of the stage. This could be a simple routing stage, forwarding data from its event queue to other relevant event queues, or a more complex stage that processes the data somehow. The event handler can read events individually or in batches – the latter’s helpful when there’s a performance benefit to batch processing, such as updating multiple database records with one query.
  • Outgoing Events: Based on the business use case and the overall structure of the flow, each stage can send new events to zero or more event queues. Creating and sending outgoing messages is done in the event handler method.
  • Thread Pool: Threading is a well-known concurrency mechanism. In SEDA, threading is localized and customized for each stage. In other words, each stage maintains a thread pool. Thus, unlike the one-thread-per-request model, each user request is processed by several threads under SEDA. This model allows us to tune each stage independently according to its complexity.
  • Controllers: A SEDA controller is any mechanism that manages the consumption of resources such as thread pool size, event queue size, scheduling, etc. Controllers are responsible for the elastic behavior of SEDA. A simple controller might manage the number of active threads in each thread pool. A more sophisticated controller could implement complex performance-tuning algorithms that monitor the whole application at runtime and tune various parameters. Moreover, controllers decouple the performance-tuning logic from the business logic. That separation of concerns makes it easier to maintain our code.

By putting all these components together, SEDA provides a robust solution for dealing with high and fluctuating traffic loads.


3. Sample Problem


In the following sections, we’ll create two implementations that solve the same problem using SEDA.


Our example problem will be straightforward: count how many times each word appears case-insensitive within a given string.


Let’s define a word as a sequence of characters without spaces, and we’ll ignore other complications such as punctuation. Our output will be a map that contains the words as keys and the counts as values. For example, given the input “My name is Hesam“, the output will be:


  "my": 1,
  "name": 1,
  "is": 1,
  "hesam": 1

3.1. Adapting the Problem to SEDA


Let’s look at our problem in terms of SEDA stages. Since scalability is a core goal of SEDA, it’s usually better to design small stages focused on specific operations, especially if we have I/O-intensive tasks. Moreover, having small stages helps us better tune the scale of each stage.


To solve our word count problem, we can implement a solution with the following stages:


Example Word-Count Flow

Now that we have our stage design, let’s implement it in the next sections using two different enterprise integration technologies. In this table, we can preview how SEDA will show up in our implementations:


SEDA Component Spring Integration Apache Camel
org.springframework.messaging.Message org.apache.camel.Exchange
Event Queue

Endpoints defined by URI strings
Event Handler
Instances of functional interfaces Camel processors, Camel utility classes, and Functions
Thread Pool
Spring abstraction of TaskExecutor Out-of-the-box support in SEDA endpoints

4. Solution Using Spring Integration


For our first implementation, we’ll use Spring Integration. Spring Integration builds on the Spring model to support popular enterprise integration patterns.

对于我们的第一个实现,我们将使用Spring Integration。Spring Integration建立在Spring模型上,支持流行的企业集成模式

Spring Integration has three main components:

Spring Integration有三个主要组成部分。

  1. A message is a data structure containing a header and a body.
  2. A channel carries messages from one endpoint to another endpoint. There are two kinds of channels in Spring Integration:
    • point-to-point: Only one endpoint can consume the messages in this channel.
    • publish-subscribe: Multiple endpoints can consume the messages in this channel.
  3. An endpoint routes a message to an application component that performs some business logic. There are a variety of endpoints in Spring Integration, such as transformers, routers, service activators, and filters.

Let’s look at an overview of our Spring Integration solution:


 Word Count EIP Diagram

4.1. Dependencies

4.1. 依赖性

Let’s get started by adding dependencies for Spring Integration, Spring Boot Test, and Spring Integration Test:

让我们开始为Spring Integration, Spring Boot TestSpring Integration Test添加依赖项。


4.2. The Message Gateway


A messaging gateway is a proxy that hides the complexity of sending a message to integration flows. Let’s set one up for our Spring Integration flow:

消息网关是一个代理,它隐藏了向集成流发送消息的复杂性。让我们为我们的 Spring 集成流设置一个。

public interface IncomingGateway {
    @Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
    public Map<String, Long> countWords(String input);

Later, we’ll be able to use this gateway method to test our entire flow:


incomingGateway.countWords("My name is Hesam");

Spring wraps the “My name is Hesam” input within an instance of org.springframework.messaging.Message and passes it to receiveTextChannel, and later gives us the final result from returnResponseChannel.


4.3. Message Channels

4.3 信息通道

In this section, we’ll look at how to set up our gateway’s initial message channel, receiveTextChannel.


Under SEDA, channels need to be scalable via an associated thread pool, so let’s begin by creating a thread pool:


TaskExecutor receiveTextChannelThreadPool() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    return executor;

Next, we’ll use our thread pool to create our channel:


@Bean(name = "receiveTextChannel")
MessageChannel getReceiveTextChannel() {
    return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)

MessageChannels is a Spring Integration class that helps us create channels of various types. Here, we use the executor() method to create an ExecutorChannel, which is a channel managed by a thread pool.


Our other channels and thread pools are set up the same way as above.


4.4. Receive Text Stage


With our channels set up, we can start implementing our stages. Let’s create our initial stage:


IntegrationFlow receiveText() {
    return IntegrationFlows.from(receiveTextChannel)

IntegrationFlows is a fluent Spring Integration API for creating IntegrationFlow objects, representing the stages of our flow. The from() method configures our stage’s incoming channel, and channel() configures the outgoing channel.


In this example, our stage passes our gateway’s input message to splitWordsChannel. This stage might be more complex and I/O intensive in a production application, reading messages from a persistent queue or over a network.


4.5. Split Words Stage


Our next stage has a single responsibility: splitting our input String into a String array of the individual words in the sentence:


IntegrationFlow splitWords() {
    return IntegrationFlows.from(splitWordsChannel)

In addition to the from() and channel() invocations we’ve used before, here we also use transform(), which applies the supplied Function to our input message.  Our splitWordsFunction implementation is very simple:

除了我们之前使用的from()channel()调用之外,这里我们还使用了transform(),它将提供的Function应用到我们的输入消息。 我们的splitWordsFunction的实现非常简单。

final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");

4.6. Convert to Lowercase Stage


This stage converts every word in our String array into lowercase:


IntegrationFlow toLowerCase() {
    return IntegrationFlows.from(toLowerCaseChannel)
      .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)

The first new IntegrationFlows method we use here is split(). The split() method uses the splitter pattern to send each element of our input message to toLowerCase as individual messages.


The next new method we see is aggregate(), which implements the aggregator pattern. The aggregator pattern has two essential arguments:


  1. the release strategy, which determines when to combine messages into a single one
  2. the processor, which determines how to combine messages into a single one

Our release strategy function uses listSizeReached, which tells the aggregator to start aggregation when all elements of the input array have been collected:


final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();

The buildMessageWithListPayload processor then packages our lowercased results into a List:

然后 buildMessageWithListPayload处理器将我们的小写结果打包成List

final MessageGroupProcessor buildMessageWithListPayload = messageGroup ->

4.7. Count Words Stage


Our final stage packages our word counts into a Map, wherein the keys are the words from the original input, and the values are the number of occurrences of each word:


IntegrationFlow countWords() {
    return IntegrationFlows.from(countWordsChannel)

Here, we use our convertArrayListToCountMap function for packaging our counts as a Map:


final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list ->
  .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

4.8. Testing Our Flow


We can pass an initial message to our gateway method to test our flow:


public class SpringIntegrationSedaIntegrationTest {
    TestGateway testGateway;

    void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
        Map<String, Long> actual = testGateway.countWords("My name is Hesam");
        Map<String, Long> expected = new HashMap<>();
        expected.put("my", 1L);
        expected.put("name", 1L);
        expected.put("is", 1L);
        expected.put("hesam", 1L);

        assertEquals(expected, actual);

5. Solution With Apache Camel

5.使用Apache Camel的解决方案

Apache Camel is a popular and powerful open-source integration framework. It’s based on four primary concepts:

Apache Camel是一个流行而强大的开源集成框架。它基于四个主要概念。

  1. Camel context: The Camel runtime sticks different parts together.
  2. Routes: A route determines how a message should be processed and where it should go next.
  3. Processors: These are ready-to-use implementations of various enterprise integration patterns.
  4. Components: Components are extension points for integrating external systems via JMS, HTTP, file IO, etc.

Apache Camel has a component dedicated to SEDA functionality, making it straightforward to build SEDA applications.

Apache Camel有一个专门用于SEDA功能的组件,使得构建SEDA应用程序变得简单明了。

5.1. Dependencies


Let’s add the required Maven dependencies for Apache Camel and Apache Camel Test:

让我们为Apache CamelApache Camel Test添加必要的Maven依赖项。


5.2. Defining SEDA Endpoints


First, we need to define the endpoints. An endpoint is a component defined with a URI string. SEDA endpoints must start with “seda:[endpointName]“:


static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
static final String returnResponse = "mock:result";

As we can see, each endpoint is configured to have five concurrent consumers. This is equivalent to having a maximum of 5 threads for each endpoint.


For the sake of testing, the returnResponse is a mock endpoint.


5.3. Extending RouteBuilder


Next, let’s define a class that extends Apache Camel’s RouteBuilder and overrides its configure() method. This class wires all SEDA endpoints:

接下来,让我们定义一个扩展Apache Camel的RouteBuilder并重写其configure()方法的类。这个类连接所有的SEDA端点。

public class WordCountRoute extends RouteBuilder {
    public void configure() throws Exception {

In the following sections, we’ll define our stages by adding lines to this configure() method using convenience methods we’ve inherited from RouteBuilder.


5.4. Receive Text Stage


This stage receives messages from a SEDA endpoint and routes them to the next stage without any processing:



Here, we used our inherited from() method to specify the incoming endpoint and to() to set the outgoing endpoint.


5.5. Split Words Stage


Let’s implement the stage for splitting the input text into individual words:


  .transform(ExpressionBuilder.bodyExpression(s -> s.toString().split(" ")))

The transform() method applies our Function to our input message, splitting it into an array.


5.6. Convert to Lowercase Stage


Our next task is to convert each word in our input to lowercase. Because we need to apply our transformation function to each  String in our message vs. the array itself, we’ll use the split() method both to split the input message for processing and to later aggregate the results back into an ArrayList:


  .split(body(), new ArrayListAggregationStrategy())
  .transform(ExpressionBuilder.bodyExpression(body -> body.toString().toLowerCase()))

The end() method marks the end of the split process. Once each item in the list has been transformed, Apache Camel applies the aggregation strategy ArrayListAggregationStrategy we’ve specified.

end()方法标志着分割过程的结束。一旦列表中的每个项目被转换,Apache Camel就会应用我们指定的聚合策略ArrayListAggregationStrategy

ArrayListAggregationStrategy extends Apache Camel’s AbstractListAggregationStrategy to define which part of the message should be aggregated. In this case, the message body is the newly-lowercased word:

ArrayListAggregationStrategy扩展了Apache Camel的AbstractListAggregationStrategy来定义消息的哪一部分应该被聚合。在这种情况下,消息主体是新的小写字母的单词。

class ArrayListAggregationStrategy extends AbstractListAggregationStrategy<String> {
    public String getValue(Exchange exchange) {
        return exchange.getIn()

5.7. Count Words Stage


The last stage uses a transformer to convert the array into a map of words to word counts:


  .transform(ExpressionBuilder.bodyExpression(List.class, body ->
    .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))

5.8. Testing Our Route


Let’s test our route:


public class ApacheCamelSedaIntegrationTest extends CamelTestSupport {
    public void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap()
      throws InterruptedException {
        Map<String, Long> expected = new HashMap<>();
        expected.put("my", 1L);
        expected.put("name", 1L);
        expected.put("is", 1L);
        expected.put("hesam", 1L);
        template.sendBody(WordCountRoute.receiveTextUri, "My name is Hesam");


    protected RoutesBuilder createRouteBuilder() throws Exception {
        RoutesBuilder wordCountRoute = new WordCountRoute();
        return wordCountRoute;

The CamelTestSupport superclass provides many fields and methods to help us test our flow. We’re using getMockEndpoint() and expectedBodiesReceived() to set our expected result, and template.sendBody() to submit test data to our mock endpoint. Finally, we use assertMockEndpointsSatisfied() to test whether our expectation matches the actual results.


6. Conclusion


In this article, we learned about SEDA and its components and use cases. Afterward, we explored how to use SEDA to solve the same problem using first Spring Integration and then Apache Camel.

在这篇文章中,我们了解了SEDA以及它的组件和用例。之后,我们探讨了如何使用SEDA来解决同样的问题,首先使用Spring Integration,然后使用Apache Camel。

As always, the source code for the examples is available over on GitHub.