Skip to content

Commit 720c078

Browse files
committed
running listener code in separate thread
1 parent f3fbf3c commit 720c078

File tree

1 file changed

+50
-28
lines changed
  • java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk

1 file changed

+50
-28
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ private static class RequestExecution<Context> {
8282
public final List<Context> contexts;
8383
public final CompletionStage<BulkResponse> futureResponse;
8484

85-
RequestExecution(long id, BulkRequest request, List<Context> contexts, CompletionStage<BulkResponse> futureResponse) {
85+
RequestExecution(long id, BulkRequest request, List<Context> contexts,
86+
CompletionStage<BulkResponse> futureResponse) {
8687
this.id = id;
8788
this.request = request;
8889
this.contexts = contexts;
@@ -99,27 +100,30 @@ private BulkIngester(Builder<Context> builder) {
99100
this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations;
100101
this.listener = builder.listener;
101102
this.flushIntervalMillis = builder.flushIntervalMillis;
102-
103-
if (flushIntervalMillis != null) {
104-
long flushInterval = flushIntervalMillis;
105103

106-
// Create a scheduler if needed
107-
ScheduledExecutorService scheduler;
104+
// Create a scheduler if needed
105+
ScheduledExecutorService scheduler = null;
106+
if (flushIntervalMillis != null || listener != null) {
107+
108108
if (builder.scheduler == null) {
109-
scheduler = Executors.newSingleThreadScheduledExecutor((r) -> {
110-
Thread t = Executors.defaultThreadFactory().newThread(r);
111-
t.setName("bulk-ingester-flusher#" + ingesterId);
112-
t.setDaemon(true);
113-
return t;
114-
});
109+
scheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> {
110+
Thread t = Executors.defaultThreadFactory().newThread(r);
111+
t.setName("bulk-ingester-executor#" + ingesterId);
112+
t.setDaemon(true);
113+
return t;
114+
});
115115

116116
// Keep it, we'll have to close it.
117117
this.scheduler = scheduler;
118118
} else {
119119
// It's not ours, we will not close it.
120120
scheduler = builder.scheduler;
121121
}
122-
122+
123+
}
124+
125+
if (flushIntervalMillis != null) {
126+
long flushInterval = flushIntervalMillis;
123127
this.flushTask = scheduler.scheduleWithFixedDelay(
124128
this::failsafeFlush,
125129
flushInterval, flushInterval,
@@ -221,7 +225,7 @@ public long requestCount() {
221225
* @see Builder#maxConcurrentRequests
222226
*/
223227
public long requestContentionsCount() {
224-
return this.sendRequestCondition.contentions();
228+
return this.sendRequestCondition.contentions();
225229
}
226230

227231
//----- Predicates for the condition variables
@@ -265,7 +269,7 @@ private BulkRequest.Builder newRequest() {
265269
private void failsafeFlush() {
266270
try {
267271
flush();
268-
} catch(Throwable thr) {
272+
} catch (Throwable thr) {
269273
// Log the error and continue
270274
logger.error("Error in background flush", thr);
271275
}
@@ -280,7 +284,8 @@ public void flush() {
280284
() -> {
281285
// Build the request
282286
BulkRequest request = newRequest().operations(operations).build();
283-
List<Context> requestContexts = contexts == null ? Collections.nCopies(operations.size(), null) : contexts;
287+
List<Context> requestContexts = contexts == null ? Collections.nCopies(operations.size(),
288+
null) : contexts;
284289

285290
// Prepare for next round
286291
operations = new ArrayList<>();
@@ -291,7 +296,8 @@ public void flush() {
291296
long id = sendRequestCondition.invocations();
292297

293298
if (listener != null) {
294-
listener.beforeBulk(id, request, requestContexts);
299+
BulkRequest finalRequest = request;
300+
scheduler.submit(() -> listener.beforeBulk(id, finalRequest, requestContexts));
295301
}
296302

297303
CompletionStage<BulkResponse> result = client.bulk(request);
@@ -303,7 +309,7 @@ public void flush() {
303309
}
304310

305311
return new RequestExecution<>(id, request, requestContexts, result);
306-
});
312+
});
307313

308314
if (exec != null) {
309315
// A request was actually sent
@@ -317,12 +323,14 @@ public void flush() {
317323
if (resp != null) {
318324
// Success
319325
if (listener != null) {
320-
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
326+
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
327+
exec.contexts, resp));
321328
}
322329
} else {
323330
// Failure
324331
if (listener != null) {
325-
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
332+
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
333+
exec.contexts, thr));
326334
}
327335
}
328336
return null;
@@ -383,7 +391,8 @@ public void close() {
383391
// Flush buffered operations
384392
flush();
385393
// and wait for all requests to be completed
386-
closeCondition.whenReady(() -> {});
394+
closeCondition.whenReady(() -> {
395+
});
387396

388397
if (flushTask != null) {
389398
flushTask.cancel(false);
@@ -404,7 +413,7 @@ public static class Builder<Context> implements ObjectBuilder<BulkIngester<Conte
404413
private ElasticsearchAsyncClient client;
405414
private BulkRequest globalSettings;
406415
private int bulkOperations = 1000;
407-
private long bulkSize = 5*1024*1024;
416+
private long bulkSize = 5 * 1024 * 1024;
408417
private int maxConcurrentRequests = 1;
409418
private Long flushIntervalMillis;
410419
private BulkListener<Context> listener;
@@ -424,7 +433,8 @@ public Builder<Context> client(ElasticsearchClient client) {
424433
}
425434

426435
/**
427-
* Sets when to flush a new bulk request based on the number of operations currently added. Defaults to
436+
* Sets when to flush a new bulk request based on the number of operations currently added.
437+
* Defaults to
428438
* {@code 1000}. Can be set to {@code -1} to disable it.
429439
*
430440
* @throws IllegalArgumentException if less than -1.
@@ -438,7 +448,8 @@ public Builder<Context> maxOperations(int count) {
438448
}
439449

440450
/**
441-
* Sets when to flush a new bulk request based on the size in bytes of actions currently added. A request is sent
451+
* Sets when to flush a new bulk request based on the size in bytes of actions currently added. A
452+
* request is sent
442453
* once that size has been exceeded. Defaults to 5 megabytes. Can be set to {@code -1} to disable it.
443454
*
444455
* @throws IllegalArgumentException if less than -1.
@@ -452,7 +463,8 @@ public Builder<Context> maxSize(long bytes) {
452463
}
453464

454465
/**
455-
* Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is allowed to be executed
466+
* Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is
467+
* allowed to be executed
456468
* while accumulating new bulk requests. Defaults to {@code 1}.
457469
*
458470
* @throws IllegalArgumentException if less than 1.
@@ -468,7 +480,8 @@ public Builder<Context> maxConcurrentRequests(int max) {
468480
/**
469481
* Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
470482
* <p>
471-
* Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
483+
* Flushing is still subject to the maximum number of requests set with
484+
* {@link #maxConcurrentRequests}.
472485
*
473486
* @throws IllegalArgumentException if not a positive duration.
474487
*/
@@ -483,13 +496,21 @@ public Builder<Context> flushInterval(long value, TimeUnit unit) {
483496
/**
484497
* Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
485498
* <p>
486-
* Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
499+
* Flushing is still subject to the maximum number of requests set with
500+
* {@link #maxConcurrentRequests}.
501+
* Deprecated in favor of {@link #scheduler}
487502
*/
503+
@Deprecated
488504
public Builder<Context> flushInterval(long value, TimeUnit unit, ScheduledExecutorService scheduler) {
489505
this.scheduler = scheduler;
490506
return flushInterval(value, unit);
491507
}
492508

509+
public Builder<Context> scheduler(ScheduledExecutorService scheduler) {
510+
this.scheduler = scheduler;
511+
return this;
512+
}
513+
493514
public Builder<Context> listener(BulkListener<Context> listener) {
494515
this.listener = listener;
495516
return this;
@@ -518,7 +539,8 @@ public Builder<Context> globalSettings(Function<BulkRequest.Builder, BulkRequest
518539
@Override
519540
public BulkIngester<Context> build() {
520541
// Ensure some chunking criteria are defined
521-
boolean hasCriteria = this.bulkOperations >= 0 || this.bulkSize >= 0 || this.flushIntervalMillis != null;
542+
boolean hasCriteria =
543+
this.bulkOperations >= 0 || this.bulkSize >= 0 || this.flushIntervalMillis != null;
522544

523545
if (!hasCriteria) {
524546
throw new IllegalStateException("No bulk operation chunking criteria have been set.");

0 commit comments

Comments
 (0)