-
Notifications
You must be signed in to change notification settings - Fork 2.4k
BATCH-2686: add builders for master/worker beans in remote chunking #599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -771,7 +771,6 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { | |
/* | ||
* Configure outbound flow (requests going to workers) | ||
*/ | ||
|
||
@Bean | ||
public DirectChannel requests() { | ||
return new DirectChannel(); | ||
|
@@ -788,7 +787,6 @@ public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) | |
/* | ||
* Configure inbound flow (replies coming from workers) | ||
*/ | ||
|
||
@Bean | ||
public QueueChannel replies() { | ||
return new QueueChannel(); | ||
|
@@ -805,7 +803,6 @@ public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) | |
/* | ||
* Configure the ChunkMessageChannelItemWriter | ||
*/ | ||
|
||
@Bean | ||
public ItemWriter<Integer> itemWriter() { | ||
MessagingTemplate messagingTemplate = new MessagingTemplate(); | ||
|
@@ -883,7 +880,6 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { | |
/* | ||
* Configure inbound flow (requests coming from the master) | ||
*/ | ||
|
||
@Bean | ||
public DirectChannel requests() { | ||
return new DirectChannel(); | ||
|
@@ -900,7 +896,6 @@ public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) | |
/* | ||
* Configure outbound flow (replies going to the master) | ||
*/ | ||
|
||
@Bean | ||
public DirectChannel replies() { | ||
return new DirectChannel(); | ||
|
@@ -917,7 +912,6 @@ public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) | |
/* | ||
* Configure the ChunkProcessorChunkHandler | ||
*/ | ||
|
||
@Bean | ||
@ServiceActivator(inputChannel = "requests", outputChannel = "replies") | ||
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() { | ||
|
@@ -944,6 +938,88 @@ when it receives chunks from the master. | |
For more information, see the section of the "Scalability" chapter on | ||
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking]. | ||
|
||
Starting from version 4.1, Spring Batch Integration introduces the `@EnableBatchIntegration` | ||
annotation that can be used to simplify remote chunking setup. This annotation provides | ||
two beans that can be autowired in the application context: | ||
|
||
* `RemoteChunkingMasterStepBuilderFactory`: used to configure the master step | ||
* `RemoteChunkingWorkerBuilder`: used to configure the remote worker integration flow | ||
|
||
These APIs will take care of configuring a number of components as described in the following diagram: | ||
|
||
.Remote Chunking Configuration | ||
image::{batch-asciidoc}images/remote-chunking-config.png[Remote Chunking Configuration, scaledwidth="80%"] | ||
|
||
On the master side, the `RemoteChunkingMasterStepBuilderFactory` allows you to | ||
configure a master step by declaring: | ||
|
||
* the item reader to read items and send them to workers | ||
* the output channel ("Outgoing requests") to send requests to workers | ||
* the input channel ("Incoming replies") to receive replies from workers | ||
|
||
There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be worded so that in future versions it still sounds accurate. Because of this, it shouldn't have references to before and after the feature was added. |
||
and the `MessagingTemplate` (Those can still be explicitly configured if required). | ||
|
||
On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure a worker to: | ||
|
||
* listen to requests sent by the master on the input channel ("Incoming requests") | ||
* call the `handleChunk` method of `ChunkProcessorChunkHandler` for each request | ||
with the configured `ItemProcessor` and `ItemWriter` | ||
* send replies on the output channel ("Outgoing replies") to the master | ||
|
||
There is no need anymore to explicitly configure the `SimpleChunkProcessor` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above about the before/after |
||
and the `ChunkProcessorChunkHandler` (Those can still be explicitly configured if required). | ||
|
||
The following example shows how to use these APIs: | ||
|
||
[source, java] | ||
---- | ||
@EnableBatchIntegration | ||
public class RemoteChunkingJobConfiguration { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indeed. |
||
|
||
@Configuration | ||
public static class MasterConfiguration { | ||
|
||
@Autowired | ||
private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory; | ||
|
||
@Bean | ||
public TaskletStep masterStep() { | ||
return this.masterStepBuilderFactory.get("masterStep") | ||
.chunk(100) | ||
.reader(itemReader()) | ||
.outputChannel(requests()) // requests sent to workers | ||
.inputChannel(replies()) // replies received from workers | ||
.build(); | ||
} | ||
|
||
// Middleware beans setup omitted | ||
|
||
} | ||
|
||
@Configuration | ||
public static class WorkerConfiguration { | ||
|
||
@Autowired | ||
private RemoteChunkingWorkerBuilder workerBuilder; | ||
|
||
@Bean | ||
public IntegrationFlow workerFlow() { | ||
return this.workerBuilder | ||
.itemProcessor(itemProcessor()) | ||
.itemWriter(itemWriter()) | ||
.inputChannel(requests()) // requests received from the master | ||
.outputChannel(replies()) // replies sent to the master | ||
.build(); | ||
} | ||
|
||
// Middleware beans setup omitted | ||
|
||
} | ||
|
||
} | ||
---- | ||
|
||
You can find a complete example of a remote chunking job | ||
link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample$$[here]. | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"simplify a remote chunking step"