diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutionState.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutionState.java index 307dae6c425c2..55f1f49f68c21 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutionState.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutionState.java @@ -75,7 +75,7 @@ public void markSeqNoAsPersisted(long seqNo) { * @param response The inference response. */ public synchronized void onInferenceResponse(long seqNo, InferenceAction.Response response) { - if (failureCollector.hasFailure() == false) { + if (response != null && failureCollector.hasFailure() == false) { bufferedResponses.put(seqNo, response); } checkpoint.markSeqNoAsProcessed(seqNo); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutor.java index d05a9a57d5265..257799962dda7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutor.java @@ -69,16 +69,19 @@ public void execute(BulkInferenceRequestIterator requests, ActionListener bulkExecutionState.onInferenceResponse(seqNo, r), - e -> bulkExecutionState.onInferenceException(seqNo, e) - ), - responseHandler::persistPendingResponses - ) + ActionListener inferenceResponseListener = ActionListener.runAfter( + ActionListener.wrap( + r -> bulkExecutionState.onInferenceResponse(seqNo, r), + e -> bulkExecutionState.onInferenceException(seqNo, e) + ), + responseHandler::persistPendingResponses ); + + if (request == null) { + inferenceResponseListener.onResponse(null); + } else { + throttledInferenceRunner.doInference(request, inferenceResponseListener); + } } } @@ -112,7 +115,6 @@ public synchronized void persistPendingResponses() { if (bulkExecutionState.hasFailure() == false) { try { InferenceAction.Response response = bulkExecutionState.fetchBufferedResponse(persistedSeqNo); - assert response != null; responses.add(response); } catch (Exception e) { bulkExecutionState.addFailure(e); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorOutputBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorOutputBuilder.java index d44a13786437a..cfb587c6451d8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorOutputBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorOutputBuilder.java @@ -51,6 +51,11 @@ public void close() { */ @Override public void addInferenceResponse(InferenceAction.Response inferenceResponse) { + if (inferenceResponse == null) { + outputBlockBuilder.appendNull(); + return; + } + ChatCompletionResults completionResults = inferenceResults(inferenceResponse); if (completionResults == null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorRequestIterator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorRequestIterator.java index d7755a310098a..6893130425edf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorRequestIterator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorRequestIterator.java @@ -58,6 +58,10 @@ public InferenceAction.Request next() { * Wraps a single prompt string into an {@link InferenceAction.Request}. */ private InferenceAction.Request inferenceRequest(String prompt) { + if (prompt == null) { + return null; + } + return InferenceAction.Request.builder(inferenceId, TaskType.COMPLETION).setInput(List.of(prompt)).build(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java index 900e17f724156..c49e301968aa0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java @@ -88,12 +88,17 @@ protected int remaining() { @Override protected Page createPage(int positionOffset, int length) { length = Integer.min(length, remaining()); - try (var builder = blockFactory.newBytesRefVectorBuilder(length)) { + try (var builder = blockFactory.newBytesRefBlockBuilder(length)) { for (int i = 0; i < length; i++) { - builder.appendBytesRef(new BytesRef(randomAlphaOfLength(10))); + if (randomInt() % 100 == 0) { + builder.appendNull(); + } else { + builder.appendBytesRef(new BytesRef(randomAlphaOfLength(10))); + } + } currentPosition += length; - return new Page(builder.build().asBlock()); + return new Page(builder.build()); } } };