Skip to content

Commit 1ed5fa3

Browse files
committed
Add a remote chunking sample
This commit adds a self contained remote chunking sample that uses an embedded JMS broker (for simplicity). Even though the broker is embedded, communication between the master and workers is still done through JMS queues and Spring Integration channels and messages are sent over the wire through a TCP port. This commit also fixes a few typos in the documentation section about remote chunking. Resolves BATCH-2721
1 parent 61bec28 commit 1ed5fa3

File tree

8 files changed

+442
-72
lines changed

8 files changed

+442
-72
lines changed

build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ project('spring-batch-samples') {
553553

554554
dependencies {
555555

556-
compile project(":spring-batch-core")
556+
compile project(":spring-batch-integration")
557557
compile "org.aspectj:aspectjrt:$aspectjVersion"
558558
compile "org.aspectj:aspectjweaver:$aspectjVersion"
559559
compile "org.quartz-scheduler:quartz:$quartzVersion"
@@ -583,6 +583,8 @@ project('spring-batch-samples') {
583583
compile "org.springframework:spring-tx:$springVersion"
584584
compile "org.springframework.data:spring-data-jpa:$springDataJpaVersion"
585585
compile "javax.mail:javax.mail-api:$javaMailVersion"
586+
compile "org.apache.activemq:activemq-client:$activemqVersion"
587+
compile "org.apache.activemq:activemq-broker:$activemqVersion"
586588

587589
testCompile "org.xmlunit:xmlunit-core:$xmlunitVersion"
588590
testCompile "org.xmlunit:xmlunit-matchers:$xmlunitVersion"
@@ -596,6 +598,7 @@ project('spring-batch-samples') {
596598
testCompile "org.codehaus.groovy:groovy-ant:$groovyVersion"
597599
testCompile "org.springframework:spring-test:$springVersion"
598600
testCompile "org.mockito:mockito-core:$mockitoVersion"
601+
testCompile "org.apache.activemq:activemq-kahadb-store:$activemqVersion"
599602

600603
testRuntime "com.sun.mail:javax.mail:$javaMailVersion"
601604

Loading

spring-batch-docs/asciidoc/spring-batch-integration.adoc

Lines changed: 69 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -706,10 +706,10 @@ configuration similar to the following:
706706
.Java Configuration
707707
[source, java, role="javaContent"]
708708
----
709-
public Job chunkJob(ItemReader itemReader) {
709+
public Job chunkJob() {
710710
return jobBuilderFactory.get("personJob")
711711
.start(stepBuilderFactory.get("step1")
712-
.chunk(200)
712+
.<Person, Person>chunk(200)
713713
.reader(itemReader())
714714
.writer(itemWriter())
715715
.build())
@@ -722,7 +722,7 @@ to use for reading data on the master. The `ItemWriter` reference
722722
points to a special `ItemWriter`
723723
(called `ChunkMessageChannelItemWriter`),
724724
as described above. The processor (if any) is left off the
725-
master configuration, as it is configured on the slave. The
725+
master configuration, as it is configured on the worker. The
726726
following configuration provides a basic master setup. You
727727
should check any additional component properties, such as
728728
throttle limits and so on, when implementing your use case.
@@ -734,7 +734,7 @@ throttle limits and so on, when implementing your use case.
734734
<property name="brokerURL" value="tcp://localhost:61616"/>
735735
</bean>
736736
737-
<int-jms:outbound-channel-adapter id="requests" destination-name="requests"/>
737+
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
738738
739739
<bean id="messagingTemplate"
740740
class="org.springframework.integration.core.MessagingTemplate">
@@ -749,12 +749,6 @@ throttle limits and so on, when implementing your use case.
749749
<property name="replyChannel" ref="replies"/>
750750
</bean>
751751
752-
<bean id="chunkHandler"
753-
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
754-
<property name="chunkWriter" ref="itemWriter"/>
755-
<property name="step" ref="step1"/>
756-
</bean>
757-
758752
<int:channel id="replies">
759753
<int:queue/>
760754
</int:channel>
@@ -774,58 +768,55 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
774768
return factory;
775769
}
776770
771+
/*
772+
* Configure outbound flow (requests going to workers)
773+
*/
774+
777775
@Bean
778776
public DirectChannel requests() {
779777
return new DirectChannel();
780778
}
781779
782780
@Bean
783-
public IntegrationFlow jmsOutboundFlow() {
784-
return IntegrationFlows.from("requests")
785-
.handle(Jms.outboundGateway(connectionFactory())
786-
.requestDestination("requests"))
781+
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
782+
return IntegrationFlows
783+
.from(requests())
784+
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
787785
.get();
788786
}
789787
790-
@Bean
791-
public MessagingTemplate messagingTemplate() {
792-
MessagingTemplate template = new MessagingTemplate();
793-
template.setDefaultChannel(requests());
794-
template.setReceiveTimeout(2000);
795-
return template;
796-
}
797-
798-
@Bean
799-
@StepScope
800-
public ChunkMessageChannelItemWriter itemWriter() {
801-
ChunkMessageChannelItemWriter chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter();
802-
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate());
803-
chunkMessageChannelItemWriter.setReplyChannel(replies());
804-
return chunkMessageChannelItemWriter;
805-
}
806-
807-
@Bean
808-
public RemoteChunkHandlerFactoryBean chunkHandler() {
809-
RemoteChunkHandlerFactoryBean remoteChunkHandlerFactoryBean = new RemoteChunkHandlerFactoryBean();
810-
remoteChunkHandlerFactoryBean.setChunkWriter(itemWriter());
811-
remoteChunkHandlerFactoryBean.setStep(step1());
812-
return remoteChunkHandlerFactoryBean;
813-
}
788+
/*
789+
* Configure inbound flow (replies coming from workers)
790+
*/
814791
815792
@Bean
816793
public QueueChannel replies() {
817794
return new QueueChannel();
818795
}
819796
820797
@Bean
821-
public IntegrationFlow jmsReplies() {
798+
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
822799
return IntegrationFlows
823-
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
824-
.configureListenerContainer(c -> c.subscriptionDurable(false))
825-
.destination("replies"))
800+
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
826801
.channel(replies())
827802
.get();
828803
}
804+
805+
/*
806+
* Configure the ChunkMessageChannelItemWriter
807+
*/
808+
809+
@Bean
810+
public ItemWriter<Integer> itemWriter() {
811+
MessagingTemplate messagingTemplate = new MessagingTemplate();
812+
messagingTemplate.setDefaultChannel(requests());
813+
messagingTemplate.setReceiveTimeout(2000);
814+
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
815+
= new ChunkMessageChannelItemWriter<>();
816+
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
817+
chunkMessageChannelItemWriter.setReplyChannel(replies());
818+
return chunkMessageChannelItemWriter;
819+
}
829820
----
830821

831822
The preceding configuration provides us with a number of beans. We
@@ -836,7 +827,7 @@ referenced by our job step, uses the
836827
`ChunkMessageChannelItemWriter` for writing chunks over the
837828
configured middleware.
838829

839-
Now we can move on to the slave configuration, as shown in the following example:
830+
Now we can move on to the worker configuration, as shown in the following example:
840831

841832

842833
.XML Configuration
@@ -849,7 +840,7 @@ Now we can move on to the slave configuration, as shown in the following example
849840
<int:channel id="requests"/>
850841
<int:channel id="replies"/>
851842
852-
<int-jms:message-driven-channel-adapter id="jmsIn"
843+
<int-jms:message-driven-channel-adapter id="incomingRequests"
853844
destination-name="requests"
854845
channel="requests"/>
855846
@@ -889,65 +880,72 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
889880
return factory;
890881
}
891882
883+
/*
884+
* Configure inbound flow (requests coming from the master)
885+
*/
886+
892887
@Bean
893888
public DirectChannel requests() {
894889
return new DirectChannel();
895890
}
896-
@Bean
897-
public DirectChannel replies() {
898-
return new DirectChannel();
899-
}
900891
901892
@Bean
902-
public IntegrationFlow jmsIn() {
893+
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
903894
return IntegrationFlows
904-
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
905-
.configureListenerContainer(c -> c.subscriptionDurable(false))
906-
.destination("requests"))
895+
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
907896
.channel(requests())
908897
.get();
909898
}
910899
900+
/*
901+
* Configure outbound flow (replies going to the master)
902+
*/
903+
911904
@Bean
912-
public IntegrationFlow outgoingReplies() {
913-
return IntegrationFlows.from("replies")
914-
.handle(Jms.outboundGateway(connectionFactory())
915-
.requestDestination("replies"))
916-
.get();
905+
public DirectChannel replies() {
906+
return new DirectChannel();
917907
}
918908
919909
@Bean
920-
@ServiceActivator(inputChannel = "requests")
921-
public AggregatorFactoryBean serviceActivator() throws Exception{
922-
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
923-
aggregatorFactoryBean.setProcessorBean(chunkProcessorChunkHandler());
924-
aggregatorFactoryBean.setOutputChannel(replies());
925-
...
926-
return aggregatorFactoryBean;
910+
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
911+
return IntegrationFlows
912+
.from(replies())
913+
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
914+
.get();
927915
}
928916
917+
/*
918+
* Configure the ChunkProcessorChunkHandler
919+
*/
920+
929921
@Bean
930-
public ChunkProcessorChunkHandler chunkProcessorChunkHandler() {
931-
ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler();
932-
chunkProcessorChunkHandler.setChunkProcessor(new SimpleChunkProcessor(personItemProcessor(), personItemWriter()));
922+
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
923+
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
924+
ChunkProcessor<Integer> chunkProcessor
925+
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
926+
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
927+
= new ChunkProcessorChunkHandler<>();
928+
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
933929
return chunkProcessorChunkHandler;
934930
}
935931
----
936932

937933
Most of these configuration items should look familiar from the
938-
master configuration. Slaves do not need access to
934+
master configuration. Workers do not need access to
939935
the Spring Batch `JobRepository` nor
940936
to the actual job configuration file. The main bean of interest
941937
is the `chunkProcessorChunkHandler`. The
942938
`chunkProcessor` property of `ChunkProcessorChunkHandler` takes a
943939
configured `SimpleChunkProcessor`, which is where you would provide a reference to your
944940
`ItemWriter` (and, optionally, your
945-
`ItemProcessor`) that will run on the slave
941+
`ItemProcessor`) that will run on the worker
946942
when it receives chunks from the master.
947943

948944
For more information, see the section of the "Scalability" chapter on
949945
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].
950946

947+
You can find a complete example of a remote chunking job
948+
link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample$$[here].
951949

952950
[[remote-partitioning]]
953951

@@ -960,8 +958,8 @@ image::{batch-asciidoc}images/remote-partitioning.png[Remote Partitioning, scale
960958
Remote Partitioning, on the other hand, is useful when it
961959
is not the processing of items but rather the associated I/O that
962960
causes the bottleneck. Using Remote Partitioning, work can
963-
be farmed out to slaves that execute complete Spring Batch
964-
steps. Thus, each slave has its own `ItemReader`, `ItemProcessor`, and
961+
be farmed out to workers that execute complete Spring Batch
962+
steps. Thus, each worker has its own `ItemReader`, `ItemProcessor`, and
965963
`ItemWriter`. For this purpose, Spring Batch
966964
Integration provides the `MessageChannelPartitionHandler`.
967965

spring-batch-samples/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ Job/Feature | skip | retry | restart | aut
3939
[multilineOrder](#multilineOrder) | | | | | | | X | | | |
4040
[parallel](#parallel) | | | | | | | | | | X |
4141
[partition](#partition) | | | | | | | | | | X |
42+
[remoteChunking](#remoteChunking) | | | | | | | | | | X |
4243
[quartz](#quartz) | | | | | X | | | | | |
4344
[restart](#restart) | | | X | | | | | | | |
4445
[retry](#retry) | | X | | | | | | | | |
@@ -644,6 +645,23 @@ the work. Notice that the readers and writers in the `Step`
644645
that is being partitioned are step-scoped, so that their state does
645646
not get shared across threads of execution.
646647

648+
### [Remote Chunking Sample](id:remoteChunking)
649+
650+
This sample shows how to configure a remote chunking job. The master step will
651+
read numbers from 1 to 6 and send two chunks ({1, 2, 3} and {4, 5, 6}) to workers
652+
for processing and writing.
653+
654+
This example shows how to:
655+
656+
* configure a `ChunkMessageChannelItemWriter` on the master side to send chunks to workers
657+
* configure a `ChunkProcessorChunkHandler` on the worker side to process chunks and
658+
send replies back to the master
659+
660+
The sample uses an embedded JMS broker as a communication middleware between the
661+
master and workers. The usage of an embedded broker is only for simplicity's sake,
662+
the communication between the master and workers is still done through JMS queues
663+
and Spring Integration channels and messages are sent over the wire through a TCP port.
664+
647665
### [Quartz Sample](id:quartz)
648666

649667
The goal is to demonstrate how to schedule job execution using

0 commit comments

Comments
 (0)