Skip to content

Add remote chunking sample #603

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ project('spring-batch-samples') {

dependencies {

compile project(":spring-batch-core")
compile project(":spring-batch-integration")
compile "org.aspectj:aspectjrt:$aspectjVersion"
compile "org.aspectj:aspectjweaver:$aspectjVersion"
compile "org.quartz-scheduler:quartz:$quartzVersion"
Expand Down Expand Up @@ -583,6 +583,8 @@ project('spring-batch-samples') {
compile "org.springframework:spring-tx:$springVersion"
compile "org.springframework.data:spring-data-jpa:$springDataJpaVersion"
compile "javax.mail:javax.mail-api:$javaMailVersion"
compile "org.apache.activemq:activemq-client:$activemqVersion"
compile "org.apache.activemq:activemq-broker:$activemqVersion"

testCompile "org.xmlunit:xmlunit-core:$xmlunitVersion"
testCompile "org.xmlunit:xmlunit-matchers:$xmlunitVersion"
Expand All @@ -596,6 +598,7 @@ project('spring-batch-samples') {
testCompile "org.codehaus.groovy:groovy-ant:$groovyVersion"
testCompile "org.springframework:spring-test:$springVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
testCompile "org.apache.activemq:activemq-kahadb-store:$activemqVersion"

testRuntime "com.sun.mail:javax.mail:$javaMailVersion"

Expand Down
Binary file modified spring-batch-docs/asciidoc/images/remote-chunking-sbi.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
140 changes: 69 additions & 71 deletions spring-batch-docs/asciidoc/spring-batch-integration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -706,10 +706,10 @@ configuration similar to the following:
.Java Configuration
[source, java, role="javaContent"]
----
public Job chunkJob(ItemReader itemReader) {
public Job chunkJob() {
return jobBuilderFactory.get("personJob")
.start(stepBuilderFactory.get("step1")
.chunk(200)
.<Person, Person>chunk(200)
.reader(itemReader())
.writer(itemWriter())
.build())
Expand All @@ -722,7 +722,7 @@ to use for reading data on the master. The `ItemWriter` reference
points to a special `ItemWriter`
(called `ChunkMessageChannelItemWriter`),
as described above. The processor (if any) is left off the
master configuration, as it is configured on the slave. The
master configuration, as it is configured on the worker. The
following configuration provides a basic master setup. You
should check any additional component properties, such as
throttle limits and so on, when implementing your use case.
Expand All @@ -734,7 +734,7 @@ throttle limits and so on, when implementing your use case.
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="requests" destination-name="requests"/>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
Expand All @@ -749,12 +749,6 @@ throttle limits and so on, when implementing your use case.
<property name="replyChannel" ref="replies"/>
</bean>

<bean id="chunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
<property name="chunkWriter" ref="itemWriter"/>
<property name="step" ref="step1"/>
</bean>

<int:channel id="replies">
<int:queue/>
</int:channel>
Expand All @@ -774,58 +768,55 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
return factory;
}

/*
* Configure outbound flow (requests going to workers)
*/

@Bean
public DirectChannel requests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlows.from("requests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requests"))
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}

@Bean
public MessagingTemplate messagingTemplate() {
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(requests());
template.setReceiveTimeout(2000);
return template;
}

@Bean
@StepScope
public ChunkMessageChannelItemWriter itemWriter() {
ChunkMessageChannelItemWriter chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate());
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}

@Bean
public RemoteChunkHandlerFactoryBean chunkHandler() {
RemoteChunkHandlerFactoryBean remoteChunkHandlerFactoryBean = new RemoteChunkHandlerFactoryBean();
remoteChunkHandlerFactoryBean.setChunkWriter(itemWriter());
remoteChunkHandlerFactoryBean.setStep(step1());
return remoteChunkHandlerFactoryBean;
}
/*
* Configure inbound flow (replies coming from workers)
*/

@Bean
public QueueChannel replies() {
return new QueueChannel();
}

@Bean
public IntegrationFlow jmsReplies() {
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("replies"))
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}

/*
* Configure the ChunkMessageChannelItemWriter
*/

@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
----

The preceding configuration provides us with a number of beans. We
Expand All @@ -836,7 +827,7 @@ referenced by our job step, uses the
`ChunkMessageChannelItemWriter` for writing chunks over the
configured middleware.

Now we can move on to the slave configuration, as shown in the following example:
Now we can move on to the worker configuration, as shown in the following example:


.XML Configuration
Expand All @@ -849,7 +840,7 @@ Now we can move on to the slave configuration, as shown in the following example
<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="jmsIn"
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>

Expand Down Expand Up @@ -889,65 +880,72 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
return factory;
}

/*
* Configure inbound flow (requests coming from the master)
*/

@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public DirectChannel replies() {
return new DirectChannel();
}

@Bean
public IntegrationFlow jmsIn() {
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requests"))
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}

/*
* Configure outbound flow (replies going to the master)
*/

@Bean
public IntegrationFlow outgoingReplies() {
return IntegrationFlows.from("replies")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("replies"))
.get();
public DirectChannel replies() {
return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "requests")
public AggregatorFactoryBean serviceActivator() throws Exception{
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(chunkProcessorChunkHandler());
aggregatorFactoryBean.setOutputChannel(replies());
...
return aggregatorFactoryBean;
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}

/*
* Configure the ChunkProcessorChunkHandler
*/

@Bean
public ChunkProcessorChunkHandler chunkProcessorChunkHandler() {
ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler();
chunkProcessorChunkHandler.setChunkProcessor(new SimpleChunkProcessor(personItemProcessor(), personItemWriter()));
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
----

Most of these configuration items should look familiar from the
master configuration. Slaves do not need access to
master configuration. Workers do not need access to
the Spring Batch `JobRepository` nor
to the actual job configuration file. The main bean of interest
is the `chunkProcessorChunkHandler`. The
`chunkProcessor` property of `ChunkProcessorChunkHandler` takes a
configured `SimpleChunkProcessor`, which is where you would provide a reference to your
`ItemWriter` (and, optionally, your
`ItemProcessor`) that will run on the slave
`ItemProcessor`) that will run on the worker
when it receives chunks from the master.

For more information, see the section of the "Scalability" chapter on
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].

You can find a complete example of a remote chunking job
link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample$$[here].

[[remote-partitioning]]

Expand All @@ -960,8 +958,8 @@ image::{batch-asciidoc}images/remote-partitioning.png[Remote Partitioning, scale
Remote Partitioning, on the other hand, is useful when it
is not the processing of items but rather the associated I/O that
causes the bottleneck. Using Remote Partitioning, work can
be farmed out to slaves that execute complete Spring Batch
steps. Thus, each slave has its own `ItemReader`, `ItemProcessor`, and
be farmed out to workers that execute complete Spring Batch
steps. Thus, each worker has its own `ItemReader`, `ItemProcessor`, and
`ItemWriter`. For this purpose, Spring Batch
Integration provides the `MessageChannelPartitionHandler`.

Expand Down
18 changes: 18 additions & 0 deletions spring-batch-samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Job/Feature | skip | retry | restart | aut
[multilineOrder](#multilineOrder) | | | | | | | X | | | |
[parallel](#parallel) | | | | | | | | | | X |
[partition](#partition) | | | | | | | | | | X |
[remoteChunking](#remoteChunking) | | | | | | | | | | X |
[quartz](#quartz) | | | | | X | | | | | |
[restart](#restart) | | | X | | | | | | | |
[retry](#retry) | | X | | | | | | | | |
Expand Down Expand Up @@ -644,6 +645,23 @@ the work. Notice that the readers and writers in the `Step`
that is being partitioned are step-scoped, so that their state does
not get shared across threads of execution.

### [Remote Chunking Sample](id:remoteChunking)

This sample shows how to configure a remote chunking job. The master step will
read numbers from 1 to 6 and send two chunks ({1, 2, 3} and {4, 5, 6}) to workers
for processing and writing.

This example shows how to:

* configure a `ChunkMessageChannelItemWriter` on the master side to send chunks to workers
* configure a `ChunkProcessorChunkHandler` on the worker side to process chunks and
send replies back to the master

The sample uses an embedded JMS broker as a communication middleware between the
master and workers. The usage of an embedded broker is only for simplicity's sake,
the communication between the master and workers is still done through JMS queues
and Spring Integration channels and messages are sent over the wire through a TCP port.

### [Quartz Sample](id:quartz)

The goal is to demonstrate how to schedule job execution using
Expand Down
Loading