diff --git a/.editorconfig b/.editorconfig index a1affe78294..bbafabe4e7c 100644 --- a/.editorconfig +++ b/.editorconfig @@ -184,6 +184,8 @@ csharp_space_between_method_call_parameter_list_parentheses = false csharp_preserve_single_line_statements = false csharp_preserve_single_line_blocks = true +csharp_style_namespace_declarations = file_scoped + # Resharper resharper_csharp_braces_for_lock=required_for_multiline resharper_csharp_braces_for_using=required_for_multiline diff --git a/src/Elastic.Clients.Elasticsearch/Helpers/BulkAllObservable.cs b/src/Elastic.Clients.Elasticsearch/Helpers/BulkAllObservable.cs index 5a0528a3b83..95d48a8631b 100644 --- a/src/Elastic.Clients.Elasticsearch/Helpers/BulkAllObservable.cs +++ b/src/Elastic.Clients.Elasticsearch/Helpers/BulkAllObservable.cs @@ -19,258 +19,277 @@ namespace Elastic.Clients.Elasticsearch; public sealed class BulkAllObservable : IDisposable, IObservable { - private bool _disposedValue; - - private readonly int _backOffRetries; - private readonly TimeSpan _backOffTime; - private readonly int _bulkSize; - private readonly ElasticsearchClient _client; - - private readonly CancellationToken _compositeCancelToken; - private readonly CancellationTokenSource _compositeCancelTokenSource; - private readonly Action _droppedDocumentCallBack; - private readonly int _maxDegreeOfParallelism; - private readonly IBulkAllRequest _partitionedBulkRequest; - private readonly Func _retryPredicate; - - private readonly Action _incrementFailed = () => { }; - private readonly Action _incrementRetries = () => { }; - - private readonly Action _bulkResponseCallback; - - public BulkAllObservable(ElasticsearchClient client, IBulkAllRequest partitionedBulkRequest, CancellationToken cancellationToken = default) - { - _client = client; - _partitionedBulkRequest = partitionedBulkRequest; - _backOffRetries = _partitionedBulkRequest.BackOffRetries.GetValueOrDefault(CoordinatedRequestDefaults.BulkAllBackOffRetriesDefault); - _backOffTime = _partitionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault; - _bulkSize = _partitionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault; - _retryPredicate = _partitionedBulkRequest.RetryDocumentPredicate ?? RetryBulkActionPredicate; - _droppedDocumentCallBack = _partitionedBulkRequest.DroppedDocumentCallback ?? DroppedDocumentCallbackDefault; - _bulkResponseCallback = _partitionedBulkRequest.BulkResponseCallback; - _maxDegreeOfParallelism = _partitionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault; - _compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - _compositeCancelToken = _compositeCancelTokenSource.Token; - } - - private void BulkAll(IObserver observer) - { - var documents = _partitionedBulkRequest.Documents; - var partitioned = new PartitionHelper(documents, _bulkSize); + private bool _disposedValue; + + private readonly int _backOffRetries; + private readonly TimeSpan _backOffTime; + private readonly int _bulkSize; + private readonly ElasticsearchClient _client; + + private readonly CancellationToken _compositeCancelToken; + private readonly CancellationTokenSource _compositeCancelTokenSource; + private readonly Action _droppedDocumentCallBack; + private readonly int _maxDegreeOfParallelism; + private readonly IBulkAllRequest _partitionedBulkRequest; + private readonly Func _retryPredicate; + + private readonly Action _incrementFailed = () => { }; + private readonly Action _incrementRetries = () => { }; + + private readonly Action _bulkResponseCallback; + + public BulkAllObservable(ElasticsearchClient client, IBulkAllRequest partitionedBulkRequest, CancellationToken cancellationToken = default) + { + _client = client; + _partitionedBulkRequest = partitionedBulkRequest; + _backOffRetries = _partitionedBulkRequest.BackOffRetries.GetValueOrDefault(CoordinatedRequestDefaults.BulkAllBackOffRetriesDefault); + _backOffTime = _partitionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault; + _bulkSize = _partitionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault; + _retryPredicate = _partitionedBulkRequest.RetryDocumentPredicate ?? RetryBulkActionPredicate; + _droppedDocumentCallBack = _partitionedBulkRequest.DroppedDocumentCallback ?? DroppedDocumentCallbackDefault; + _bulkResponseCallback = _partitionedBulkRequest.BulkResponseCallback; + _maxDegreeOfParallelism = _partitionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault; + _compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _compositeCancelToken = _compositeCancelTokenSource.Token; + } + + private void BulkAll(IObserver observer) + { + var documents = _partitionedBulkRequest.Documents; + var partitioned = new PartitionHelper(documents, _bulkSize); #pragma warning disable 4014 #pragma warning disable VSTHRD110 // Observe result of async calls - partitioned.ForEachAsync( + partitioned.ForEachAsync( #pragma warning restore 4014 - (buffer, page) => BulkAsync(buffer, page, 0), - (buffer, response) => observer.OnNext(response), - ex => OnCompleted(ex, observer), - _maxDegreeOfParallelism - ); + (buffer, page) => BulkAsync(buffer, page, 0), + (buffer, response) => observer.OnNext(response), + ex => OnCompleted(ex, observer), + _maxDegreeOfParallelism + ); #pragma warning restore VSTHRD110 // Observe result of async calls - } - - private void OnCompleted(Exception exception, IObserver observer) - { - if (exception != null) - observer.OnError(exception); - else - { - try - { - RefreshOnCompleted(); - observer.OnCompleted(); - } - catch (Exception e) - { - observer.OnError(e); - } - } - } - - private void RefreshOnCompleted() - { - if (!_partitionedBulkRequest.RefreshOnCompleted) - return; - - var indices = _partitionedBulkRequest.RefreshIndices ?? _partitionedBulkRequest.Index; - if (indices == null) - return; - - var rc = _partitionedBulkRequest switch - { - IHelperCallable helperCallable when helperCallable.ParentMetaData is not null => helperCallable.ParentMetaData, - _ => RequestMetaDataFactory.BulkHelperRequestMetaData(), - }; - - var request = new IndexManagement.RefreshRequest(indices); - - if (rc is not null) - request.RequestConfiguration = new RequestConfiguration { RequestMetaData = rc }; - - var refresh = _client.Indices.Refresh(request); - - if (!refresh.IsValid) - throw Throw($"Refreshing after all documents have indexed failed", refresh.ApiCall); - } - - private async Task BulkAsync(IList buffer, long page, int backOffRetries) - { - _compositeCancelToken.ThrowIfCancellationRequested(); - - var request = _partitionedBulkRequest; - - var response = await _client.BulkAsync(s => - { - s.Index(request.Index); - s.Timeout(request.Timeout); - - if (request.BufferToBulk is not null) - { - request.BufferToBulk(s, buffer); - } - else - { - s.IndexMany(buffer); - } - - if (!string.IsNullOrEmpty(request.Pipeline)) - s.Pipeline(request.Pipeline); - if (request.Routing != null) - s.Routing(request.Routing); - if (request.WaitForActiveShards.HasValue) - s.WaitForActiveShards(request.WaitForActiveShards.ToString()); - - switch (_partitionedBulkRequest) - { - case IHelperCallable helperCallable when helperCallable.ParentMetaData is not null: - s.RequestConfiguration(rc => rc.RequestMetaData(helperCallable.ParentMetaData)); - break; - default: - s.RequestConfiguration(rc => rc.RequestMetaData(RequestMetaDataFactory.BulkHelperRequestMetaData())); - break; - } - - }, _compositeCancelToken).ConfigureAwait(false); - - _compositeCancelToken.ThrowIfCancellationRequested(); - _bulkResponseCallback?.Invoke(response); - - if (!response.ApiCall.Success) - return await HandleBulkRequestAsync(buffer, page, backOffRetries, response).ConfigureAwait(false); - - var retryableDocuments = new List(); - var droppedDocuments = new List>(); - - foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create)) - { - if (documentWithResponse.Item1.IsValid) - continue; - - if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2)) - retryableDocuments.Add(documentWithResponse.Item2); - else - droppedDocuments.Add(documentWithResponse); - } - - HandleDroppedDocuments(droppedDocuments, response); - - if (retryableDocuments.Count > 0 && backOffRetries < _backOffRetries) - return await RetryDocumentsAsync(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false); - - if (retryableDocuments.Count > 0) - throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times."); - - request.BackPressure?.Release(); - - return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items }; - } - - private void HandleDroppedDocuments(List> droppedDocuments, BulkResponse response) - { - if (droppedDocuments.Count <= 0) - return; - - foreach (var dropped in droppedDocuments) - _droppedDocumentCallBack(dropped.Item1, dropped.Item2); - - if (!_partitionedBulkRequest.ContinueAfterDroppedDocuments) - throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after receiving failures that can not be retried from _bulk"); - } - - private async Task HandleBulkRequestAsync(IList buffer, long page, int backOffRetries, BulkResponse response) - { - var clientException = response.ApiCall.OriginalException as TransportException; - var failureReason = clientException?.FailureReason; - var reason = failureReason?.GetStringValue() ?? nameof(PipelineFailure.BadRequest); - switch (failureReason) - { - case PipelineFailure.MaxRetriesReached: - if (response.ApiCall.AuditTrail.Last().Event == AuditEvent.FailedOverAllNodes) - throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after attempted bulk failed over all the active nodes"); - - ThrowOnExhaustedRetries(); - return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false); - case PipelineFailure.CouldNotStartSniffOnStartup: - case PipelineFailure.BadAuthentication: - case PipelineFailure.NoNodesAttempted: - case PipelineFailure.SniffFailure: - case PipelineFailure.Unexpected: - throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk"); - case PipelineFailure.BadResponse: - case PipelineFailure.PingFailure: - case PipelineFailure.MaxTimeoutReached: - case PipelineFailure.BadRequest: - default: - ThrowOnExhaustedRetries(); - return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false); - } - - void ThrowOnExhaustedRetries() - { - if (backOffRetries < _backOffRetries) - return; - - throw ThrowOnBadBulk(response, - $"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk and exhausting retries ({backOffRetries})"); - } - } - - private async Task RetryDocumentsAsync(long page, int backOffRetries, IList retryDocuments) - { - _incrementRetries(); - await Task.Delay(_backOffTime, _compositeCancelToken).ConfigureAwait(false); - return await BulkAsync(retryDocuments, page, backOffRetries).ConfigureAwait(false); - } - - private Exception ThrowOnBadBulk(IElasticsearchResponse response, string message) - { - _incrementFailed(); - _partitionedBulkRequest.BackPressure?.Release(); - return Throw(message, response.ApiCall); - } - - private static bool RetryBulkActionPredicate(ResponseItem bulkResponseItem, T d) => bulkResponseItem.Status == 429; - - private static void DroppedDocumentCallbackDefault(ResponseItem bulkResponseItem, T d) { } - - public void Dispose() - { - if (!_disposedValue) - { - _compositeCancelTokenSource?.Cancel(); - _compositeCancelTokenSource?.Dispose(); - - _disposedValue = true; - } - } - - public IDisposable Subscribe(IObserver observer) - { - observer.ThrowIfNull(nameof(observer)); - BulkAll(observer); - return this; - } - - private static TransportException Throw(string message, IApiCallDetails details) => - new(PipelineFailure.BadResponse, message, details); + } + + private void OnCompleted(Exception exception, IObserver observer) + { + if (exception != null) + observer.OnError(exception); + else + { + try + { + RefreshOnCompleted(); + observer.OnCompleted(); + } + catch (Exception e) + { + observer.OnError(e); + } + } + } + + private void RefreshOnCompleted() + { + if (!_partitionedBulkRequest.RefreshOnCompleted) + return; + + var indices = _partitionedBulkRequest.RefreshIndices ?? _partitionedBulkRequest.Index; + if (indices == null) + return; + + var rc = _partitionedBulkRequest switch + { + IHelperCallable helperCallable when helperCallable.ParentMetaData is not null => helperCallable.ParentMetaData, + _ => RequestMetaDataFactory.BulkHelperRequestMetaData(), + }; + + var request = new IndexManagement.RefreshRequest(indices); + + if (rc is not null) + request.RequestConfiguration = new RequestConfiguration { RequestMetaData = rc }; + + var refresh = _client.Indices.Refresh(request); + + if (!refresh.IsValid) + throw Throw($"Refreshing after all documents have indexed failed", refresh.ApiCall); + } + + private async Task BulkAsync(IList buffer, long page, int backOffRetries) + { + _compositeCancelToken.ThrowIfCancellationRequested(); + + var request = _partitionedBulkRequest; + + var response = await _client.BulkAsync(s => + { + s.Index(request.Index); + s.Timeout(request.Timeout); + + if (request.BufferToBulk is not null) + { + request.BufferToBulk(s, buffer); + } + else + { + s.IndexMany(buffer); + } + + if (!string.IsNullOrEmpty(request.Pipeline)) + s.Pipeline(request.Pipeline); + if (request.Routing != null) + s.Routing(request.Routing); + if (request.WaitForActiveShards.HasValue) + s.WaitForActiveShards(request.WaitForActiveShards.ToString()); + + switch (_partitionedBulkRequest) + { + case IHelperCallable helperCallable when helperCallable.ParentMetaData is not null: + s.RequestConfiguration(rc => rc.RequestMetaData(helperCallable.ParentMetaData)); + break; + default: + s.RequestConfiguration(rc => rc.RequestMetaData(RequestMetaDataFactory.BulkHelperRequestMetaData())); + break; + } + + }, _compositeCancelToken).ConfigureAwait(false); + + _compositeCancelToken.ThrowIfCancellationRequested(); + _bulkResponseCallback?.Invoke(response); + + if (!response.ApiCall.Success) + return await HandleBulkRequestAsync(buffer, page, backOffRetries, response).ConfigureAwait(false); + + var retryableDocuments = new List(); + var droppedDocuments = new List>(); + + var retryableDocsRemainingAfterRetriesExceeded = false; + + foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create)) + { + if (documentWithResponse.Item1.IsValid) + continue; + + if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2)) + { + if (backOffRetries < _backOffRetries) + { + retryableDocuments.Add(documentWithResponse.Item2); + } + else + { + // We still have retriable documents but have exceeded all retries, so we mark these as + // dropped so they get handled correctly. + retryableDocsRemainingAfterRetriesExceeded = true; + droppedDocuments.Add(documentWithResponse); + } + } + else + { + droppedDocuments.Add(documentWithResponse); + } + } + + HandleDroppedDocuments(droppedDocuments, response); + + if (retryableDocsRemainingAfterRetriesExceeded) + { + throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times."); + } + else if (retryableDocuments.Count > 0) + { + return await RetryDocumentsAsync(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false); + } + + request.BackPressure?.Release(); + + return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items }; + } + + private void HandleDroppedDocuments(List> droppedDocuments, BulkResponse response) + { + if (droppedDocuments.Count <= 0) + return; + + foreach (var dropped in droppedDocuments) + _droppedDocumentCallBack(dropped.Item1, dropped.Item2); + + if (!_partitionedBulkRequest.ContinueAfterDroppedDocuments) + throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after receiving failures that can not be retried from _bulk"); + } + + private async Task HandleBulkRequestAsync(IList buffer, long page, int backOffRetries, BulkResponse response) + { + var clientException = response.ApiCall.OriginalException as TransportException; + var failureReason = clientException?.FailureReason; + var reason = failureReason?.GetStringValue() ?? nameof(PipelineFailure.BadRequest); + switch (failureReason) + { + case PipelineFailure.MaxRetriesReached: + if (response.ApiCall.AuditTrail.Last().Event == AuditEvent.FailedOverAllNodes) + throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after attempted bulk failed over all the active nodes"); + + ThrowOnExhaustedRetries(); + return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false); + case PipelineFailure.CouldNotStartSniffOnStartup: + case PipelineFailure.BadAuthentication: + case PipelineFailure.NoNodesAttempted: + case PipelineFailure.SniffFailure: + case PipelineFailure.Unexpected: + throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk"); + case PipelineFailure.BadResponse: + case PipelineFailure.PingFailure: + case PipelineFailure.MaxTimeoutReached: + case PipelineFailure.BadRequest: + default: + ThrowOnExhaustedRetries(); + return await RetryDocumentsAsync(page, ++backOffRetries, buffer).ConfigureAwait(false); + } + + void ThrowOnExhaustedRetries() + { + if (backOffRetries < _backOffRetries) + return; + + throw ThrowOnBadBulk(response, + $"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk and exhausting retries ({backOffRetries})"); + } + } + + private async Task RetryDocumentsAsync(long page, int backOffRetries, IList retryDocuments) + { + _incrementRetries(); + await Task.Delay(_backOffTime, _compositeCancelToken).ConfigureAwait(false); + return await BulkAsync(retryDocuments, page, backOffRetries).ConfigureAwait(false); + } + + private Exception ThrowOnBadBulk(IElasticsearchResponse response, string message) + { + _incrementFailed(); + _partitionedBulkRequest.BackPressure?.Release(); + return Throw(message, response.ApiCall); + } + + private static bool RetryBulkActionPredicate(ResponseItem bulkResponseItem, T d) => bulkResponseItem.Status == 429; + + private static void DroppedDocumentCallbackDefault(ResponseItem bulkResponseItem, T d) { } + + public void Dispose() + { + if (!_disposedValue) + { + _compositeCancelTokenSource?.Cancel(); + _compositeCancelTokenSource?.Dispose(); + + _disposedValue = true; + } + } + + public IDisposable Subscribe(IObserver observer) + { + observer.ThrowIfNull(nameof(observer)); + BulkAll(observer); + return this; + } + + private static TransportException Throw(string message, IApiCallDetails details) => + new(PipelineFailure.BadResponse, message, details); } diff --git a/tests/Tests/Document/Multiple/BulkAll/BulkAllForEachAsyncApiTests.cs b/tests/Tests/Document/Multiple/BulkAll/BulkAllForEachAsyncApiTests.cs index 1a94cae0481..25519e647ab 100644 --- a/tests/Tests/Document/Multiple/BulkAll/BulkAllForEachAsyncApiTests.cs +++ b/tests/Tests/Document/Multiple/BulkAll/BulkAllForEachAsyncApiTests.cs @@ -43,7 +43,7 @@ await observableBulk seenPages.Should().Be(pages); var count = await Client.CountAsync(new CountRequest(index)); - //var count = Client.Count(f => f.Index(index)); + //var count = Client.Count(f => f.Index(index)); // TODO - Reintroduce this shortcut count.Count.Should().Be(numberOfDocuments); } } diff --git a/tests/Tests/Document/Multiple/BulkAll/BulkAllRetryTests.cs b/tests/Tests/Document/Multiple/BulkAll/BulkAllRetryTests.cs new file mode 100644 index 00000000000..7c573950a1c --- /dev/null +++ b/tests/Tests/Document/Multiple/BulkAll/BulkAllRetryTests.cs @@ -0,0 +1,49 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Tests.Document.Multiple.BulkAll; + +public class BulkAllRetryTests +{ + [U] + public void RetriedButFailedDocuments_InvokeDroppedDocumentsCallback() + { + var response = @"{""took"":30,""errors"":true,""items"":[{""index"":{""_index"":""thing"",""_id"":""1"",""status"":429}}]}"; + + var responseBytes = Encoding.UTF8.GetBytes(response); + var connection = new InMemoryConnection(responseBytes, 200); + var connectionPool = new SingleNodePool(new Uri("http://localhost:9200")); + var settings = new ElasticsearchClientSettings(connectionPool, connection).DefaultIndex("thing"); + var client = new ElasticsearchClient(settings); + + var docs = new Thing[] { new Thing { Id = 100 } }; + var dropped = new List(); + + var bulkAll = client.BulkAll(docs, b => b + .BackOffRetries(2) + .BackOffTime("1ms") + .ContinueAfterDroppedDocuments() + .DroppedDocumentCallback((r, d) => { dropped.Add(d); })); + + try + { + bulkAll.Wait(TimeSpan.FromMinutes(1), r => { }); + } + catch + { + } + + dropped.Should().HaveCount(1); + dropped[0].Id.Should().Be(100); + } + + private class Thing + { + public int Id { get; set; } + } +}