Skip to content

Commit 429f567

Browse files
committed
BATCH-2686: update samples/docs
1 parent 3e5bb95 commit 429f567

File tree

5 files changed

+423
-1
lines changed

5 files changed

+423
-1
lines changed

build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ project('spring-batch-samples') {
553553

554554
dependencies {
555555

556-
compile project(":spring-batch-core")
556+
compile project(":spring-batch-integration")
557557
compile "org.aspectj:aspectjrt:$aspectjVersion"
558558
compile "org.aspectj:aspectjweaver:$aspectjVersion"
559559
compile "org.quartz-scheduler:quartz:$quartzVersion"
@@ -609,6 +609,8 @@ project('spring-batch-samples') {
609609
optional "org.springframework:spring-web:$springVersion"
610610
optional "org.springframework.data:spring-data-commons:$springDataCommonsVersion"
611611
optional "org.springframework.amqp:spring-amqp:$springAmqpVersion"
612+
optional "org.springframework.integration:spring-integration-amqp:$springIntegrationVersion"
613+
612614
optional ("org.springframework.amqp:spring-rabbit:$springAmqpVersion") {
613615
exclude group: "org.springframework", module: "spring-messaging"
614616
exclude group: "org.springframework", module: "spring-web"

spring-batch-docs/asciidoc/spring-batch-integration.adoc

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,77 @@ when it receives chunks from the master.
948948
For more information, see the section of the "Scalability" chapter on
949949
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].
950950

951+
Starting from version 4.1, Spring Batch Integration provides the `@EnableBatchIntegration`
952+
annotation that can be used to simplify remote chunking setup. This annotation provides
953+
two beans that can be autowired in the application context:
954+
955+
* `RemoteChunkingMasterStepBuilderFactory`: used to configure the master step
956+
* `RemoteChunkingWorkerFlowBuilder`: used to configure the remote worker flow
957+
958+
The following example shows how to use these APIs:
959+
960+
[source, java]
961+
----
962+
@EnableBatchIntegration
963+
public class RemoteChunkingConfiguration {
964+
965+
@Configuration
966+
public static class MasterConfiguration {
967+
968+
@Autowired
969+
private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;
970+
971+
@Bean
972+
public TaskletStep masterStep() {
973+
return masterStepBuilderFactory.get("masterStep")
974+
.chunk(100)
975+
.reader(itemReader())
976+
.outputChannel(requests()) // requests sent to workers
977+
.inputChannel(replies()) // replies received from workers
978+
.build();
979+
}
980+
981+
// Middleware beans setup omitted
982+
983+
}
984+
985+
@Configuration
986+
public static class WorkerConfiguration {
987+
988+
@Autowired
989+
private RemoteChunkingWorkerFlowBuilder workerFlowBuilder;
990+
991+
@Bean
992+
public IntegrationFlow workerFlow(DirectChannel requests, DirectChannel replies){
993+
return workerFlowBuilder
994+
.itemProcessor(itemProcessor())
995+
.itemWriter(itemWriter())
996+
.inputChannel(requests) // requests received from the master
997+
.outputChannel(replies) // replies sent to the master
998+
.build();
999+
}
1000+
1001+
// Middleware beans setup omitted
1002+
}
1003+
1004+
}
1005+
----
1006+
1007+
On the master side, the `RemoteChunkingMasterStepBuilderFactory` makes it possible
1008+
to configure a master step by declaring the item reader, the output channel
1009+
(to send requests to workers) and the input channel (to receive replies from workers).
1010+
There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter`
1011+
and `RemoteChunkHandlerFactoryBean`.
1012+
1013+
On the worker side, the `RemoteChunkingWorkerFlowBuilder` allows you to configure
1014+
the worker flow to listens to requests sent by the master on the input channel,
1015+
call the `ChunkProcessorChunkHandler` for each request with the configured `ItemProcessor`
1016+
and `ItemWriter` and finally send replies on the output channel to the master.
1017+
There is no need anymore to explicitly configure the `ChunkProcessorChunkHandler`
1018+
and `AggregatorFactoryBean`.
1019+
1020+
You can find a complete example of a remote chunking job
1021+
link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample$$[here].
9511022

9521023
[[remote-partitioning]]
9531024

spring-batch-samples/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ Job/Feature | skip | retry | restart | aut
3939
[multilineOrder](#multilineOrder) | | | | | | | X | | | |
4040
[parallel](#parallel) | | | | | | | | | | X |
4141
[partition](#partition) | | | | | | | | | | X |
42+
[remoteChunking](#remoteChunking) | | | | | | | | | | X |
4243
[quartz](#quartz) | | | | | X | | | | | |
4344
[restart](#restart) | | | X | | | | | | | |
4445
[retry](#retry) | | X | | | | | | | | |
@@ -644,6 +645,14 @@ the work. Notice that the readers and writers in the `Step`
644645
that is being partitioned are step-scoped, so that their state does
645646
not get shared across threads of execution.
646647

648+
### [Remote Chunking Sample](id:remoteChunking)
649+
650+
This sample shows how to configure a remote chunking job. The sample uses
651+
the `RemoteChunkingMasterStepBuilderFactory` to create a master step and
652+
the `RemoteChunkingWorkerFlowBuilder` to configure an integration flow on
653+
the worker side. This example also shows how to use rabbitmq as a communication
654+
middleware between the master and workers.
655+
647656
### [Quartz Sample](id:quartz)
648657

649658
The goal is to demonstrate how to schedule job execution using
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.sample.remotechunking;
18+
19+
import org.springframework.amqp.core.AmqpTemplate;
20+
import org.springframework.amqp.core.Binding;
21+
import org.springframework.amqp.core.BindingBuilder;
22+
import org.springframework.amqp.core.Queue;
23+
import org.springframework.amqp.core.TopicExchange;
24+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
25+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
26+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
27+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
28+
import org.springframework.batch.integration.chunk.RemoteChunkingWorkerFlowBuilder;
29+
import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
30+
import org.springframework.batch.item.ItemProcessor;
31+
import org.springframework.batch.item.ItemWriter;
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.beans.factory.annotation.Value;
34+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
35+
import org.springframework.context.annotation.Bean;
36+
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.context.annotation.PropertySource;
38+
import org.springframework.integration.amqp.dsl.Amqp;
39+
import org.springframework.integration.channel.DirectChannel;
40+
import org.springframework.integration.config.EnableIntegration;
41+
import org.springframework.integration.dsl.IntegrationFlow;
42+
import org.springframework.integration.dsl.IntegrationFlows;
43+
44+
/**
45+
* This class is used to start a worker for
46+
* {@code org.springframework.batch.sample.RemoteChunkingJobFunctionalTests#testLaunchJob()}.
47+
*/
48+
49+
@Configuration
50+
@EnableBatchProcessing
51+
@EnableBatchIntegration
52+
@EnableIntegration
53+
@PropertySource("classpath:default.amqp.properties")
54+
public class WorkerConfiguration {
55+
56+
@Value("${rabbitmq.host}")
57+
private String host;
58+
59+
@Value("${rabbitmq.port}")
60+
private int port;
61+
62+
public static void main(String[] args) {
63+
new AnnotationConfigApplicationContext(WorkerConfiguration.class);
64+
}
65+
66+
@Autowired
67+
private RemoteChunkingWorkerFlowBuilder<Integer, Integer> workerFlowBuilder;
68+
69+
@Bean
70+
public ConnectionFactory rabbitConnectionFactory() {
71+
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
72+
cachingConnectionFactory.setHost(host);
73+
cachingConnectionFactory.setPort(port);
74+
return cachingConnectionFactory;
75+
}
76+
77+
@Bean
78+
public AmqpTemplate rabbitTemplate() {
79+
RabbitTemplate rabbitTemplate = new RabbitTemplate();
80+
rabbitTemplate.setConnectionFactory(rabbitConnectionFactory());
81+
return rabbitTemplate;
82+
}
83+
84+
@Bean
85+
public Queue requestQueue() {
86+
return new Queue("requests", false);
87+
}
88+
89+
@Bean
90+
public Queue repliesQueue() {
91+
return new Queue("replies", false);
92+
}
93+
94+
@Bean
95+
public TopicExchange exchange() {
96+
return new TopicExchange("remote-chunking-exchange");
97+
}
98+
99+
@Bean
100+
Binding repliesBinding(TopicExchange exchange) {
101+
return BindingBuilder
102+
.bind(repliesQueue())
103+
.to(exchange)
104+
.with("replies");
105+
}
106+
107+
@Bean
108+
Binding requestBinding(TopicExchange exchange) {
109+
return BindingBuilder
110+
.bind(requestQueue())
111+
.to(exchange)
112+
.with("requests");
113+
}
114+
115+
@Bean
116+
public DirectChannel requests() {
117+
return new DirectChannel();
118+
}
119+
120+
@Bean
121+
public DirectChannel replies() {
122+
return new DirectChannel();
123+
}
124+
125+
@Bean
126+
public IntegrationFlow incomingRequests(ConnectionFactory rabbitConnectionFactory) {
127+
return IntegrationFlows
128+
.from(Amqp.inboundAdapter(rabbitConnectionFactory, "requests"))
129+
.channel(requests())
130+
.get();
131+
}
132+
133+
@Bean
134+
public IntegrationFlow outgoingReplies(AmqpTemplate rabbitTemplate) {
135+
return IntegrationFlows
136+
.from("replies")
137+
.handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("replies"))
138+
.get();
139+
}
140+
141+
@Bean
142+
public IntegrationFlow integrationFlow(DirectChannel requests, DirectChannel replies) {
143+
return workerFlowBuilder
144+
.itemProcessor(itemProcessor())
145+
.itemWriter(itemWriter())
146+
.inputChannel(requests)
147+
.outputChannel(replies)
148+
.build();
149+
}
150+
151+
@Bean
152+
public ItemWriter<Integer> itemWriter() {
153+
return items -> {
154+
for (Integer item : items) {
155+
System.out.println("writing item " + item);
156+
}
157+
};
158+
}
159+
160+
@Bean
161+
public ItemProcessor<Integer, Integer> itemProcessor() {
162+
return item -> {
163+
System.out.println("processing item " + item);
164+
return item;
165+
};
166+
}
167+
168+
}

0 commit comments

Comments
 (0)