diff --git a/src/Elasticsearch.Net/Auditing/Audit.cs b/src/Elasticsearch.Net/Auditing/Audit.cs index 76c6927c465..955ecd65c88 100644 --- a/src/Elasticsearch.Net/Auditing/Audit.cs +++ b/src/Elasticsearch.Net/Auditing/Audit.cs @@ -11,16 +11,16 @@ public class Audit public string Path { get; internal set; } public Exception Exception { get; internal set; } - public Audit(AuditEvent type, DateTime occured) + public Audit(AuditEvent type, DateTime started) { this.Event = type; - this.Started = occured; + this.Started = started; } public override string ToString() { var took = Started - Ended; - return $"Node: {Node?.Uri}, Event: {Event.GetStringValue()} NodeAlive: {Node?.IsAlive}, Took: {took.ToString()}"; + return $"Node: {Node?.Uri}, Event: {Event.GetStringValue()} NodeAlive: {Node?.IsAlive}, Took: {took}"; } } } \ No newline at end of file diff --git a/src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs b/src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs index 379ff5e66f9..e4de25b7b8d 100644 --- a/src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs +++ b/src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Specialized; using System.ComponentModel; +using System.Threading; namespace Elasticsearch.Net { @@ -10,15 +11,16 @@ namespace Elasticsearch.Net /// public class ConnectionConfiguration : ConnectionConfiguration { - public static TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1); - public static TimeSpan DefaultPingTimeout = TimeSpan.FromSeconds(2); - public static TimeSpan DefaultPingTimeoutOnSSL = TimeSpan.FromSeconds(5); + public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1); + public static readonly TimeSpan DefaultPingTimeout = TimeSpan.FromSeconds(2); + public static readonly TimeSpan DefaultPingTimeoutOnSSL = TimeSpan.FromSeconds(5); /// /// ConnectionConfiguration allows you to control how ElasticsearchClient behaves and where/how it connects /// to elasticsearch /// /// The root of the elasticsearch node we want to connect to. Defaults to http://localhost:9200 + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] public ConnectionConfiguration(Uri uri = null) : this(new SingleNodeConnectionPool(uri ?? new Uri("http://localhost:9200"))) { } @@ -47,6 +49,7 @@ public ConnectionConfiguration(IConnectionPool connectionPool, Func serializerFactory) : base(connectionPool, connection, serializerFactory) { } + } [Browsable(false)] @@ -54,6 +57,9 @@ public ConnectionConfiguration(IConnectionPool connectionPool, IConnection conne public abstract class ConnectionConfiguration : IConnectionConfigurationValues, IHideObjectMembers where T : ConnectionConfiguration { + private SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + SemaphoreSlim IConnectionConfigurationValues.BootstrapLock => this._semaphore; + private TimeSpan _requestTimeout; TimeSpan IConnectionConfigurationValues.RequestTimeout => _requestTimeout; @@ -130,11 +136,10 @@ private static void DefaultApiCallHandler(IApiCallDetails status) { } BasicAuthenticationCredentials _basicAuthCredentials; BasicAuthenticationCredentials IConnectionConfigurationValues.BasicAuthenticationCredentials => _basicAuthCredentials; - protected IElasticsearchSerializer _serializer; + private readonly IElasticsearchSerializer _serializer; IElasticsearchSerializer IConnectionConfigurationValues.Serializer => _serializer; private readonly IConnectionPool _connectionPool; - private readonly Func _serializerFactory; IConnectionPool IConnectionConfigurationValues.ConnectionPool => _connectionPool; private readonly IConnection _connection; @@ -147,9 +152,9 @@ protected ConnectionConfiguration(IConnectionPool connectionPool, IConnection co { this._connectionPool = connectionPool; this._connection = connection ?? new HttpConnection(); - this._serializerFactory = serializerFactory ?? (c=>this.DefaultSerializer((T)this)); + serializerFactory = serializerFactory ?? (c=>this.DefaultSerializer((T)this)); // ReSharper disable once VirtualMemberCallInContructor - this._serializer = this._serializerFactory((T)this); + this._serializer = serializerFactory((T)this); this._requestTimeout = ConnectionConfiguration.DefaultTimeout; this._sniffOnConnectionFault = true; @@ -308,6 +313,15 @@ public T BasicAuthentication(string userName, string password) /// Note: HTTP pipelining must also be enabled in Elasticsearch for this to work properly. /// public T EnableHttpPipelining(bool enabled = true) => Assign(a => a._enableHttpPipelining = enabled); + + void IDisposable.Dispose() => this.DisposeManagedResources(); + + protected virtual void DisposeManagedResources() + { + this._connectionPool?.Dispose(); + this._connection?.Dispose(); + this._semaphore?.Dispose(); + } } } diff --git a/src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs b/src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs index 5e927974d0c..2f3fcb80842 100644 --- a/src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs +++ b/src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs @@ -1,10 +1,14 @@ using System; using System.Collections.Specialized; +using System.Threading; namespace Elasticsearch.Net { - public interface IConnectionConfigurationValues + public interface IConnectionConfigurationValues : IDisposable { + /// Provides a semaphoreslim to transport implementations that need to limit access to a resource + SemaphoreSlim BootstrapLock { get; } + /// The connection pool to use when talking with elasticsearch IConnectionPool ConnectionPool { get; } diff --git a/src/Elasticsearch.Net/Connection/HttpConnection.cs b/src/Elasticsearch.Net/Connection/HttpConnection.cs index e45df7c7395..81137070d42 100644 --- a/src/Elasticsearch.Net/Connection/HttpConnection.cs +++ b/src/Elasticsearch.Net/Connection/HttpConnection.cs @@ -194,5 +194,9 @@ private void HandleException(ResponseBuilder builder, WebExcep builder.Stream = response.GetResponseStream(); } } + + void IDisposable.Dispose() => this.DisposeManagedResources(); + + protected virtual void DisposeManagedResources() { } } } diff --git a/src/Elasticsearch.Net/Connection/IConnection.cs b/src/Elasticsearch.Net/Connection/IConnection.cs index 39bde480d2c..9c3d2422d82 100644 --- a/src/Elasticsearch.Net/Connection/IConnection.cs +++ b/src/Elasticsearch.Net/Connection/IConnection.cs @@ -1,8 +1,9 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; namespace Elasticsearch.Net { - public interface IConnection + public interface IConnection : IDisposable { Task> RequestAsync(RequestData requestData) where TReturn : class; diff --git a/src/Elasticsearch.Net/ConnectionPool/IConnectionPool.cs b/src/Elasticsearch.Net/ConnectionPool/IConnectionPool.cs index e330b10781e..f9f84cb7999 100644 --- a/src/Elasticsearch.Net/ConnectionPool/IConnectionPool.cs +++ b/src/Elasticsearch.Net/ConnectionPool/IConnectionPool.cs @@ -3,7 +3,7 @@ namespace Elasticsearch.Net { - public interface IConnectionPool + public interface IConnectionPool : IDisposable { /// /// Returns a readonly constant view of all the nodes in the cluster, this might involve creating copies of the nodes e.g diff --git a/src/Elasticsearch.Net/ConnectionPool/SingleNodeConnectionPool.cs b/src/Elasticsearch.Net/ConnectionPool/SingleNodeConnectionPool.cs index 623af08da44..034f38e8286 100644 --- a/src/Elasticsearch.Net/ConnectionPool/SingleNodeConnectionPool.cs +++ b/src/Elasticsearch.Net/ConnectionPool/SingleNodeConnectionPool.cs @@ -29,5 +29,9 @@ public SingleNodeConnectionPool(Uri uri, IDateTimeProvider dateTimeProvider = nu } public IEnumerable CreateView(Action audit = null) => this.Nodes; + + void IDisposable.Dispose() => this.DisposeManagedResources(); + + protected virtual void DisposeManagedResources() { } } } \ No newline at end of file diff --git a/src/Elasticsearch.Net/ConnectionPool/SniffingConnectionPool.cs b/src/Elasticsearch.Net/ConnectionPool/SniffingConnectionPool.cs index f866489e7a6..b714c46c1b9 100644 --- a/src/Elasticsearch.Net/ConnectionPool/SniffingConnectionPool.cs +++ b/src/Elasticsearch.Net/ConnectionPool/SniffingConnectionPool.cs @@ -73,5 +73,10 @@ public override IEnumerable CreateView(Action audit = nu } } + protected override void DisposeManagedResources() + { + this._readerWriter?.Dispose(); + base.DisposeManagedResources(); + } } } \ No newline at end of file diff --git a/src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs b/src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs index f6a6408d564..e8651edf10c 100644 --- a/src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs +++ b/src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs @@ -97,5 +97,8 @@ public virtual IEnumerable CreateView(Action audit = nul } } + void IDisposable.Dispose() => this.DisposeManagedResources(); + + protected virtual void DisposeManagedResources() { } } } \ No newline at end of file diff --git a/src/Elasticsearch.Net/Elasticsearch.Net.csproj b/src/Elasticsearch.Net/Elasticsearch.Net.csproj index 83d3542cee9..edab42a1888 100644 --- a/src/Elasticsearch.Net/Elasticsearch.Net.csproj +++ b/src/Elasticsearch.Net/Elasticsearch.Net.csproj @@ -23,6 +23,7 @@ 4 false false + AllRules.ruleset pdbonly @@ -54,6 +55,7 @@ + diff --git a/src/Elasticsearch.Net/ElasticsearchClient.cs b/src/Elasticsearch.Net/ElasticsearchClient.cs index d55bd50a3c5..0a2124e1ee1 100644 --- a/src/Elasticsearch.Net/ElasticsearchClient.cs +++ b/src/Elasticsearch.Net/ElasticsearchClient.cs @@ -16,9 +16,11 @@ public partial class ElasticsearchClient : IElasticsearchClient protected ITransport Transport { get; set; } /// Instantiate a new low level elasticsearch client to http://localhost:9200 + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] public ElasticsearchClient() : this(new Transport(new ConnectionConfiguration())) { } /// Instantiate a new low level elasticsearch client using the specified settings + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] public ElasticsearchClient(IConnectionConfigurationValues settings) : this(new Transport(settings ?? new ConnectionConfiguration())) { } /// diff --git a/src/Elasticsearch.Net/GlobalSuppressions.cs b/src/Elasticsearch.Net/GlobalSuppressions.cs new file mode 100644 index 00000000000..66d2f90eef2 Binary files /dev/null and b/src/Elasticsearch.Net/GlobalSuppressions.cs differ diff --git a/src/Elasticsearch.Net/Transport/ITransport.cs b/src/Elasticsearch.Net/Transport/ITransport.cs index d7cc8297cba..04449422305 100644 --- a/src/Elasticsearch.Net/Transport/ITransport.cs +++ b/src/Elasticsearch.Net/Transport/ITransport.cs @@ -1,8 +1,10 @@ +using System; using System.Threading.Tasks; namespace Elasticsearch.Net { - public interface ITransport where TConnectionSettings : IConnectionConfigurationValues + public interface ITransport + where TConnectionSettings : IConnectionConfigurationValues { TConnectionSettings Settings { get; } diff --git a/src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs b/src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs index efbbce64797..11cadcfd1fe 100644 --- a/src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs +++ b/src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs @@ -1,472 +1,471 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Net; -using System.Threading; -using System.Threading.Tasks; -using static Elasticsearch.Net.AuditEvent; - -namespace Elasticsearch.Net -{ - - public class RequestPipeline : IRequestPipeline - { - private readonly IConnectionConfigurationValues _settings; - private readonly IConnection _connection; - private readonly IConnectionPool _connectionPool; - private readonly IDateTimeProvider _dateTimeProvider; - private readonly IMemoryStreamFactory _memoryStreamFactory; - private readonly CancellationToken _cancellationToken; - - private IRequestParameters RequestParameters { get; } - private IRequestConfiguration RequestConfiguration { get; } - - - public DateTime StartedOn { get; } - - public List AuditTrail { get; } = new List(); - private int _retried = 0; - public int Retried => _retried; - - public RequestPipeline( - IConnectionConfigurationValues configurationValues, - IDateTimeProvider dateTimeProvider, - IMemoryStreamFactory memoryStreamFactory, - IRequestParameters requestParameters) - { - this._settings = configurationValues; - this._connectionPool = this._settings.ConnectionPool; - this._connection = this._settings.Connection; - this._dateTimeProvider = dateTimeProvider; - this._memoryStreamFactory = memoryStreamFactory; - - this.RequestParameters = requestParameters; - this.RequestConfiguration = requestParameters?.RequestConfiguration; - this._cancellationToken = this.RequestConfiguration?.CancellationToken ?? CancellationToken.None; - this.StartedOn = dateTimeProvider.Now(); - } - - public int MaxRetries => - this.RequestConfiguration?.ForceNode != null - ? 0 - : Math.Min(this.RequestConfiguration?.MaxRetries ?? this._settings.MaxRetries.GetValueOrDefault(int.MaxValue), this._connectionPool.MaxRetries); - - public bool FirstPoolUsageNeedsSniffing => - (!this.RequestConfiguration?.DisableSniff).GetValueOrDefault(true) +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using static Elasticsearch.Net.AuditEvent; + +namespace Elasticsearch.Net +{ + + public class RequestPipeline : IRequestPipeline + { + private readonly IConnectionConfigurationValues _settings; + private readonly IConnection _connection; + private readonly IConnectionPool _connectionPool; + private readonly IDateTimeProvider _dateTimeProvider; + private readonly IMemoryStreamFactory _memoryStreamFactory; + private readonly CancellationToken _cancellationToken; + + private IRequestParameters RequestParameters { get; } + private IRequestConfiguration RequestConfiguration { get; } + + public DateTime StartedOn { get; } + + public List AuditTrail { get; } = new List(); + private int _retried = 0; + public int Retried => _retried; + + public RequestPipeline( + IConnectionConfigurationValues configurationValues, + IDateTimeProvider dateTimeProvider, + IMemoryStreamFactory memoryStreamFactory, + IRequestParameters requestParameters) + { + this._settings = configurationValues; + this._connectionPool = this._settings.ConnectionPool; + this._connection = this._settings.Connection; + this._dateTimeProvider = dateTimeProvider; + this._memoryStreamFactory = memoryStreamFactory; + + this.RequestParameters = requestParameters; + this.RequestConfiguration = requestParameters?.RequestConfiguration; + this._cancellationToken = this.RequestConfiguration?.CancellationToken ?? CancellationToken.None; + this.StartedOn = dateTimeProvider.Now(); + } + + public int MaxRetries => + this.RequestConfiguration?.ForceNode != null + ? 0 + : Math.Min(this.RequestConfiguration?.MaxRetries ?? this._settings.MaxRetries.GetValueOrDefault(int.MaxValue), this._connectionPool.MaxRetries); + + public bool FirstPoolUsageNeedsSniffing => + (!this.RequestConfiguration?.DisableSniff).GetValueOrDefault(true) && this._connectionPool.SupportsReseeding && this._settings.SniffsOnStartup && !this._connectionPool.SniffedOnStartup; - public bool SniffsOnConnectionFailure => - (!this.RequestConfiguration?.DisableSniff).GetValueOrDefault(true) - && this._connectionPool.SupportsReseeding && this._settings.SniffsOnConnectionFault; - - public bool SniffsOnStaleCluster => - (!this.RequestConfiguration?.DisableSniff).GetValueOrDefault(true) - && this._connectionPool.SupportsReseeding && this._settings.SniffInformationLifeSpan.HasValue; - - public bool StaleClusterState - { - get - { - if (!SniffsOnStaleCluster) return false; - // ReSharper disable once PossibleInvalidOperationException - // already checked by SniffsOnStaleCluster - var sniffLifeSpan = this._settings.SniffInformationLifeSpan.Value; - - var now = this._dateTimeProvider.Now(); - var lastSniff = this._connectionPool.LastUpdate; - - return sniffLifeSpan < (now - lastSniff); - } - } - - private bool PingDisabled(Node node) => - (this.RequestConfiguration?.DisablePing).GetValueOrDefault(false) - || this._settings.DisablePings || !this._connectionPool.SupportsPinging || !node.IsResurrected; - - TimeSpan PingTimeout => - this.RequestConfiguration?.PingTimeout - ?? this._settings.PingTimeout - ?? (this._connectionPool.UsingSsl ? ConnectionConfiguration.DefaultPingTimeoutOnSSL : ConnectionConfiguration.DefaultPingTimeout); - - TimeSpan RequestTimeout => this.RequestConfiguration?.RequestTimeout ?? this._settings.RequestTimeout; - - public bool IsTakingTooLong - { - get - { - var timeout = this._settings.MaxRetryTimeout.GetValueOrDefault(this.RequestTimeout); - var now = this._dateTimeProvider.Now(); - - //we apply a soft margin so that if a request timesout at 59 seconds when the maximum is 60 we also abort. - var margin = (timeout.TotalMilliseconds / 100.0) * 98; - var marginTimeSpan = TimeSpan.FromMilliseconds(margin); - var timespanCall = (now - this.StartedOn); - var tookToLong = timespanCall >= marginTimeSpan; - return tookToLong; - } - } - - public bool Refresh { get; private set; } - - public bool DepleededRetries => this.Retried >= this.MaxRetries + 1 || this.IsTakingTooLong; - - private Auditable Audit(AuditEvent type) => new Auditable(type, this.AuditTrail, this._dateTimeProvider); - - public void MarkDead(Node node) - { - var deadUntil = this._dateTimeProvider.DeadTime(node.FailedAttempts, this._settings.DeadTimeout, this._settings.MaxDeadTimeout); - node.MarkDead(deadUntil); - this._retried++; - } - - public void MarkAlive(Node node) => node.MarkAlive(); - - public void FirstPoolUsage(SemaphoreSlim semaphore) - { - if (!this.FirstPoolUsageNeedsSniffing) return; - if (!semaphore.Wait(this._settings.RequestTimeout)) - throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, (Exception)null); - if (!this.FirstPoolUsageNeedsSniffing) return; - try - { - using (this.Audit(SniffOnStartup)) - { - this.Sniff(); - this._connectionPool.SniffedOnStartup = true; - } - } - finally - { - semaphore.Release(); - } - } - - public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore) - { - if (!this.FirstPoolUsageNeedsSniffing) return; - var success = await semaphore.WaitAsync(this._settings.RequestTimeout, this._cancellationToken); - if (!success) - throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, (Exception)null); - - if (!this.FirstPoolUsageNeedsSniffing) return; - try - { - using (this.Audit(SniffOnStartup)) - { - await this.SniffAsync(); - this._connectionPool.SniffedOnStartup = true; - } - } - finally - { - semaphore.Release(); - } - } - - public void SniffOnStaleCluster() - { - if (!StaleClusterState) return; - using (this.Audit(AuditEvent.SniffOnStaleCluster)) - { - this.Sniff(); - this._connectionPool.SniffedOnStartup = true; - } - } - - public async Task SniffOnStaleClusterAsync() - { - if (!StaleClusterState) return; - using (this.Audit(AuditEvent.SniffOnStaleCluster)) - { - await this.SniffAsync(); - this._connectionPool.SniffedOnStartup = true; - } - } - - public IEnumerable NextNode() - { - if (this.RequestConfiguration?.ForceNode != null) - { - yield return new Node(this.RequestConfiguration.ForceNode); - yield break; - } - - //This for loop allows to break out of the view state machine if we need to - //force a refresh (after reseeding connectionpool). We have a hardcoded limit of only - //allowing 100 of these refreshes per call - var refreshed = false; - for (var i = 0; i < 100; i++) - { - if (this.DepleededRetries) yield break; - foreach (var node in this._connectionPool - .CreateView((e, n)=> { using (new Auditable(e, this.AuditTrail, this._dateTimeProvider) { Node = n }) {} }) - .TakeWhile(node => !this.DepleededRetries)) - { - yield return node; - if (!this.Refresh) continue; - this.Refresh = false; - refreshed = true; - break; - } - //unless a refresh was requested we will not iterate over more then a single view. - //keep in mind refreshes are also still bound to overall maxretry count/timeout. - if (!refreshed) break; - } - } - - private RequestData CreatePingRequestData(Node node, Auditable audit) - { - audit.Node = node; - - var requestOverrides = new RequestConfiguration - { - PingTimeout = this.PingTimeout, - RequestTimeout = this.RequestTimeout, - BasicAuthenticationCredentials = this._settings.BasicAuthenticationCredentials, - EnableHttpPipelining = this.RequestConfiguration?.EnableHttpPipelining ?? this._settings.HttpPipeliningEnabled, - ForceNode = this.RequestConfiguration?.ForceNode, - CancellationToken = this.RequestConfiguration?.CancellationToken ?? default(CancellationToken) - }; - - return new RequestData(HttpMethod.HEAD, "/", null, this._settings, requestOverrides, this._memoryStreamFactory) { Node = node }; - } - - public void Ping(Node node) - { - if (PingDisabled(node)) return; - - using (var audit = this.Audit(PingSuccess)) - { - try - { - var pingData = CreatePingRequestData(node, audit); - var response = this._connection.Request(pingData); + public bool SniffsOnConnectionFailure => + (!this.RequestConfiguration?.DisableSniff).GetValueOrDefault(true) + && this._connectionPool.SupportsReseeding && this._settings.SniffsOnConnectionFault; + + public bool SniffsOnStaleCluster => + (!this.RequestConfiguration?.DisableSniff).GetValueOrDefault(true) + && this._connectionPool.SupportsReseeding && this._settings.SniffInformationLifeSpan.HasValue; + + public bool StaleClusterState + { + get + { + if (!SniffsOnStaleCluster) return false; + // ReSharper disable once PossibleInvalidOperationException + // already checked by SniffsOnStaleCluster + var sniffLifeSpan = this._settings.SniffInformationLifeSpan.Value; + + var now = this._dateTimeProvider.Now(); + var lastSniff = this._connectionPool.LastUpdate; + + return sniffLifeSpan < (now - lastSniff); + } + } + + private bool PingDisabled(Node node) => + (this.RequestConfiguration?.DisablePing).GetValueOrDefault(false) + || this._settings.DisablePings || !this._connectionPool.SupportsPinging || !node.IsResurrected; + + TimeSpan PingTimeout => + this.RequestConfiguration?.PingTimeout + ?? this._settings.PingTimeout + ?? (this._connectionPool.UsingSsl ? ConnectionConfiguration.DefaultPingTimeoutOnSSL : ConnectionConfiguration.DefaultPingTimeout); + + TimeSpan RequestTimeout => this.RequestConfiguration?.RequestTimeout ?? this._settings.RequestTimeout; + + public bool IsTakingTooLong + { + get + { + var timeout = this._settings.MaxRetryTimeout.GetValueOrDefault(this.RequestTimeout); + var now = this._dateTimeProvider.Now(); + + //we apply a soft margin so that if a request timesout at 59 seconds when the maximum is 60 we also abort. + var margin = (timeout.TotalMilliseconds / 100.0) * 98; + var marginTimeSpan = TimeSpan.FromMilliseconds(margin); + var timespanCall = (now - this.StartedOn); + var tookToLong = timespanCall >= marginTimeSpan; + return tookToLong; + } + } + + public bool Refresh { get; private set; } + + public bool DepleededRetries => this.Retried >= this.MaxRetries + 1 || this.IsTakingTooLong; + + private Auditable Audit(AuditEvent type) => new Auditable(type, this.AuditTrail, this._dateTimeProvider); + + public void MarkDead(Node node) + { + var deadUntil = this._dateTimeProvider.DeadTime(node.FailedAttempts, this._settings.DeadTimeout, this._settings.MaxDeadTimeout); + node.MarkDead(deadUntil); + this._retried++; + } + + public void MarkAlive(Node node) => node.MarkAlive(); + + public void FirstPoolUsage(SemaphoreSlim semaphore) + { + if (!this.FirstPoolUsageNeedsSniffing) return; + if (!semaphore.Wait(this._settings.RequestTimeout)) + throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, (Exception)null); + if (!this.FirstPoolUsageNeedsSniffing) return; + try + { + using (this.Audit(SniffOnStartup)) + { + this.Sniff(); + this._connectionPool.SniffedOnStartup = true; + } + } + finally + { + semaphore.Release(); + } + } + + public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore) + { + if (!this.FirstPoolUsageNeedsSniffing) return; + var success = await semaphore.WaitAsync(this._settings.RequestTimeout, this._cancellationToken); + if (!success) + throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, (Exception)null); + + if (!this.FirstPoolUsageNeedsSniffing) return; + try + { + using (this.Audit(SniffOnStartup)) + { + await this.SniffAsync(); + this._connectionPool.SniffedOnStartup = true; + } + } + finally + { + semaphore.Release(); + } + } + + public void SniffOnStaleCluster() + { + if (!StaleClusterState) return; + using (this.Audit(AuditEvent.SniffOnStaleCluster)) + { + this.Sniff(); + this._connectionPool.SniffedOnStartup = true; + } + } + + public async Task SniffOnStaleClusterAsync() + { + if (!StaleClusterState) return; + using (this.Audit(AuditEvent.SniffOnStaleCluster)) + { + await this.SniffAsync(); + this._connectionPool.SniffedOnStartup = true; + } + } + + public IEnumerable NextNode() + { + if (this.RequestConfiguration?.ForceNode != null) + { + yield return new Node(this.RequestConfiguration.ForceNode); + yield break; + } + + //This for loop allows to break out of the view state machine if we need to + //force a refresh (after reseeding connectionpool). We have a hardcoded limit of only + //allowing 100 of these refreshes per call + var refreshed = false; + for (var i = 0; i < 100; i++) + { + if (this.DepleededRetries) yield break; + foreach (var node in this._connectionPool + .CreateView((e, n)=> { using (new Auditable(e, this.AuditTrail, this._dateTimeProvider) { Node = n }) {} }) + .TakeWhile(node => !this.DepleededRetries)) + { + yield return node; + if (!this.Refresh) continue; + this.Refresh = false; + refreshed = true; + break; + } + //unless a refresh was requested we will not iterate over more then a single view. + //keep in mind refreshes are also still bound to overall maxretry count/timeout. + if (!refreshed) break; + } + } + + private RequestData CreatePingRequestData(Node node, Auditable audit) + { + audit.Node = node; + + var requestOverrides = new RequestConfiguration + { + PingTimeout = this.PingTimeout, + RequestTimeout = this.RequestTimeout, + BasicAuthenticationCredentials = this._settings.BasicAuthenticationCredentials, + EnableHttpPipelining = this.RequestConfiguration?.EnableHttpPipelining ?? this._settings.HttpPipeliningEnabled, + ForceNode = this.RequestConfiguration?.ForceNode, + CancellationToken = this.RequestConfiguration?.CancellationToken ?? default(CancellationToken) + }; + + return new RequestData(HttpMethod.HEAD, "/", null, this._settings, requestOverrides, this._memoryStreamFactory) { Node = node }; + } + + public void Ping(Node node) + { + if (PingDisabled(node)) return; + + using (var audit = this.Audit(PingSuccess)) + { + try + { + var pingData = CreatePingRequestData(node, audit); + var response = this._connection.Request(pingData); ThrowBadAuthPipelineExceptionWhenNeeded(response); //ping should not silently accept bad but valid http responses - if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse); - } - catch (Exception e) + if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse); + } + catch (Exception e) { audit.Event = PingFailure; audit.Exception = e; throw new PipelineException(PipelineFailure.PingFailure, e); - } - } - } - - public async Task PingAsync(Node node) - { - if (PingDisabled(node)) return; - - using (var audit = this.Audit(PingSuccess)) - { - try - { - var pingData = CreatePingRequestData(node, audit); - var response = await this._connection.RequestAsync(pingData); + } + } + } + + public async Task PingAsync(Node node) + { + if (PingDisabled(node)) return; + + using (var audit = this.Audit(PingSuccess)) + { + try + { + var pingData = CreatePingRequestData(node, audit); + var response = await this._connection.RequestAsync(pingData); ThrowBadAuthPipelineExceptionWhenNeeded(response); //ping should not silently accept bad but valid http responses - if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse); - } - catch (Exception e) + if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse); + } + catch (Exception e) { audit.Event = PingFailure; audit.Exception = e; throw new PipelineException(PipelineFailure.PingFailure, e); - } - } - } - - private void ThrowBadAuthPipelineExceptionWhenNeeded(ElasticsearchResponse response) - { - if (response.HttpStatusCode == 401) - throw new PipelineException(PipelineFailure.BadAuthentication, response.OriginalException); - } - - public string SniffPath => "_nodes/_all/settings?flat_settings&timeout=" + this.PingTimeout.ToTimeUnit(); - - public IEnumerable SniffNodes => this._connectionPool + } + } + } + + private void ThrowBadAuthPipelineExceptionWhenNeeded(ElasticsearchResponse response) + { + if (response.HttpStatusCode == 401) + throw new PipelineException(PipelineFailure.BadAuthentication, response.OriginalException); + } + + public string SniffPath => "_nodes/_all/settings?flat_settings&timeout=" + this.PingTimeout.ToTimeUnit(); + + public IEnumerable SniffNodes => this._connectionPool .CreateView((e, n)=> { using (new Auditable(e, this.AuditTrail, this._dateTimeProvider) { Node = n }) {} }) - .ToList() - .OrderBy(n => n.MasterEligable ? n.Uri.Port : int.MaxValue); - + .ToList() + .OrderBy(n => n.MasterEligable ? n.Uri.Port : int.MaxValue); + public void SniffOnConnectionFailure() { if (!this.SniffsOnConnectionFailure) return; using (this.Audit(SniffOnFail)) this.Sniff(); - } - - public async Task SniffOnConnectionFailureAsync() - { - if (!this.SniffsOnConnectionFailure) return; - using (this.Audit(SniffOnFail)) - await this.SniffAsync(); - } - - public void Sniff() - { - var path = this.SniffPath; - var exceptions = new List(); - foreach (var node in this.SniffNodes) - { - using (var audit = this.Audit(SniffSuccess)) - { - audit.Node = node; - try - { - var requestData = new RequestData(HttpMethod.GET, path, null, this._settings, this._memoryStreamFactory) { Node = node }; - var response = this._connection.Request(requestData); - ThrowBadAuthPipelineExceptionWhenNeeded(response); + } + + public async Task SniffOnConnectionFailureAsync() + { + if (!this.SniffsOnConnectionFailure) return; + using (this.Audit(SniffOnFail)) + await this.SniffAsync(); + } + + public void Sniff() + { + var path = this.SniffPath; + var exceptions = new List(); + foreach (var node in this.SniffNodes) + { + using (var audit = this.Audit(SniffSuccess)) + { + audit.Node = node; + try + { + var requestData = new RequestData(HttpMethod.GET, path, null, this._settings, this._memoryStreamFactory) { Node = node }; + var response = this._connection.Request(requestData); + ThrowBadAuthPipelineExceptionWhenNeeded(response); //sniff should not silently accept bad but valid http responses if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse); - var nodes = response.Body.ToNodes(this._connectionPool.UsingSsl); - this._connectionPool.Reseed(nodes); - this.Refresh = true; - return; - } - catch (Exception e) - { + var nodes = response.Body.ToNodes(this._connectionPool.UsingSsl); + this._connectionPool.Reseed(nodes); + this.Refresh = true; + return; + } + catch (Exception e) + { audit.Event = SniffFailure; - audit.Exception = e; - exceptions.Add(e); - continue; - } - } - } - throw new PipelineException(PipelineFailure.SniffFailure, new AggregateException(exceptions)); - } - - public async Task SniffAsync() - { - var path = this.SniffPath; - var exceptions = new List(); - foreach (var node in this.SniffNodes) - { - using (var audit = this.Audit(SniffSuccess)) - { - audit.Node = node; - try - { - var requestData = new RequestData(HttpMethod.GET, path, null, this._settings, this._memoryStreamFactory) { Node = node }; - var response = await this._connection.RequestAsync(requestData); + audit.Exception = e; + exceptions.Add(e); + continue; + } + } + } + throw new PipelineException(PipelineFailure.SniffFailure, new AggregateException(exceptions)); + } + + public async Task SniffAsync() + { + var path = this.SniffPath; + var exceptions = new List(); + foreach (var node in this.SniffNodes) + { + using (var audit = this.Audit(SniffSuccess)) + { + audit.Node = node; + try + { + var requestData = new RequestData(HttpMethod.GET, path, null, this._settings, this._memoryStreamFactory) { Node = node }; + var response = await this._connection.RequestAsync(requestData); ThrowBadAuthPipelineExceptionWhenNeeded(response); //sniff should not silently accept bad but valid http responses - if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse); - this._connectionPool.Reseed(response.Body.ToNodes(this._connectionPool.UsingSsl)); - this.Refresh = true; - return; - } - catch (Exception e) - { + if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse); + this._connectionPool.Reseed(response.Body.ToNodes(this._connectionPool.UsingSsl)); + this.Refresh = true; + return; + } + catch (Exception e) + { audit.Event = SniffFailure; - audit.Exception = e; - exceptions.Add(e); - continue; - } - } - } - throw new PipelineException(PipelineFailure.SniffFailure, new AggregateException(exceptions)); - } - - public ElasticsearchResponse CallElasticsearch(RequestData requestData) where TReturn : class - { - using (var audit = this.Audit(HealthyResponse)) - { - audit.Node = requestData.Node; - audit.Path = requestData.Path; - - ElasticsearchResponse response = null; - try - { - response = this._connection.Request(requestData); - response.AuditTrail = this.AuditTrail; - ThrowBadAuthPipelineExceptionWhenNeeded(response); - if (!response.Success) audit.Event = AuditEvent.BadResponse; - return response; - } - catch (Exception e) - { - (response as ElasticsearchResponse)?.Body?.Dispose(); + audit.Exception = e; + exceptions.Add(e); + continue; + } + } + } + throw new PipelineException(PipelineFailure.SniffFailure, new AggregateException(exceptions)); + } + + public ElasticsearchResponse CallElasticsearch(RequestData requestData) where TReturn : class + { + using (var audit = this.Audit(HealthyResponse)) + { + audit.Node = requestData.Node; + audit.Path = requestData.Path; + + ElasticsearchResponse response = null; + try + { + response = this._connection.Request(requestData); + response.AuditTrail = this.AuditTrail; + ThrowBadAuthPipelineExceptionWhenNeeded(response); + if (!response.Success) audit.Event = AuditEvent.BadResponse; + return response; + } + catch (Exception e) + { + (response as ElasticsearchResponse)?.Body?.Dispose(); audit.Event = AuditEvent.BadResponse; - audit.Exception = e; + audit.Exception = e; e.RethrowKeepingStackTrace(); - return null; //dead code due to call to RethrowKeepingStackTrace() - } - } - } - - public async Task> CallElasticsearchAsync(RequestData requestData) where TReturn : class - { - using (var audit = this.Audit(HealthyResponse)) - { - audit.Node = requestData.Node; - audit.Path = requestData.Path; - - ElasticsearchResponse response = null; - try - { - response = await this._connection.RequestAsync(requestData); - response.AuditTrail = this.AuditTrail; + return null; //dead code due to call to RethrowKeepingStackTrace() + } + } + } + + public async Task> CallElasticsearchAsync(RequestData requestData) where TReturn : class + { + using (var audit = this.Audit(HealthyResponse)) + { + audit.Node = requestData.Node; + audit.Path = requestData.Path; + + ElasticsearchResponse response = null; + try + { + response = await this._connection.RequestAsync(requestData); + response.AuditTrail = this.AuditTrail; ThrowBadAuthPipelineExceptionWhenNeeded(response); - if (!response.Success) audit.Event = AuditEvent.BadResponse; - return response; - } - catch (Exception e) - { - (response as ElasticsearchResponse)?.Body?.Dispose(); + if (!response.Success) audit.Event = AuditEvent.BadResponse; + return response; + } + catch (Exception e) + { + (response as ElasticsearchResponse)?.Body?.Dispose(); audit.Event = AuditEvent.BadResponse; audit.Exception = e; e.RethrowKeepingStackTrace(); - return null; //dead code due to call to RethrowKeepingStackTrace() - } - } - } - - public void BadResponse(ref ElasticsearchResponse response, RequestData data, List pipelineExceptions) - where TReturn : class - { - var pipelineFailure = PipelineFailure.BadResponse; - if (pipelineExceptions.HasAny()) - pipelineFailure = pipelineExceptions.Last().FailureReason; - - var innerException = pipelineExceptions.HasAny() - ? new AggregateException(pipelineExceptions) - : response?.OriginalException; - - var exceptionMessage = innerException?.Message ?? "Could not complete the request to Elasticsearch."; - - if (this.IsTakingTooLong) - { - pipelineFailure = PipelineFailure.MaxTimeoutReached; - this.Audit(MaxTimeoutReached); - exceptionMessage = "Maximum timout reached while retrying request"; - } - else if (this.Retried >= this.MaxRetries && this.MaxRetries > 0) + return null; //dead code due to call to RethrowKeepingStackTrace() + } + } + } + + public void BadResponse(ref ElasticsearchResponse response, RequestData data, List pipelineExceptions) + where TReturn : class + { + var pipelineFailure = PipelineFailure.BadResponse; + if (pipelineExceptions.HasAny()) + pipelineFailure = pipelineExceptions.Last().FailureReason; + + var innerException = pipelineExceptions.HasAny() + ? new AggregateException(pipelineExceptions) + : response?.OriginalException; + + var exceptionMessage = innerException?.Message ?? "Could not complete the request to Elasticsearch."; + + if (this.IsTakingTooLong) + { + pipelineFailure = PipelineFailure.MaxTimeoutReached; + this.Audit(MaxTimeoutReached); + exceptionMessage = "Maximum timout reached while retrying request"; + } + else if (this.Retried >= this.MaxRetries && this.MaxRetries > 0) { - pipelineFailure = PipelineFailure.MaxRetriesReached; - this.Audit(MaxRetriesReached); - exceptionMessage = "Maximum number of retries reached."; - } - - var clientException = new ElasticsearchClientException(pipelineFailure, exceptionMessage, innerException) - { - Request = data, - Response = response, - AuditTrail = this.AuditTrail - }; - - if (_settings.ThrowExceptions) throw clientException; - - if (response == null) - response = new ResponseBuilder(data) { Exception = clientException }.ToResponse(); - - response.AuditTrail = this.AuditTrail; - } - - public void Dispose() - { - } - } + pipelineFailure = PipelineFailure.MaxRetriesReached; + this.Audit(MaxRetriesReached); + exceptionMessage = "Maximum number of retries reached."; + } + + var clientException = new ElasticsearchClientException(pipelineFailure, exceptionMessage, innerException) + { + Request = data, + Response = response, + AuditTrail = this.AuditTrail + }; + + if (_settings.ThrowExceptions) throw clientException; + + if (response == null) + response = new ResponseBuilder(data) { Exception = clientException }.ToResponse(); + + response.AuditTrail = this.AuditTrail; + } + + void IDisposable.Dispose() => this.Dispose(); + + protected virtual void Dispose() { } + } } \ No newline at end of file diff --git a/src/Elasticsearch.Net/Transport/Transport.cs b/src/Elasticsearch.Net/Transport/Transport.cs index 742f3c0e548..f7e4e7276df 100644 --- a/src/Elasticsearch.Net/Transport/Transport.cs +++ b/src/Elasticsearch.Net/Transport/Transport.cs @@ -1,90 +1,87 @@ -using System.Collections.Generic; +using System.Collections.Generic; using System.Threading.Tasks; -using System.Threading; +using System.Threading; using System; -namespace Elasticsearch.Net -{ - public class Transport : ITransport - where TConnectionSettings : IConnectionConfigurationValues - { - private readonly SemaphoreSlim _semaphore; - - //TODO should all of these be public? - public TConnectionSettings Settings { get; } - public IDateTimeProvider DateTimeProvider { get; } - public IMemoryStreamFactory MemoryStreamFactory { get; } - public IRequestPipelineFactory PipelineProvider { get; } - - /// - /// Transport coordinates the client requests over the connection pool nodes and is in charge of falling over on different nodes - /// - /// The connectionsettings to use for this transport - public Transport(TConnectionSettings configurationValues) - : this(configurationValues, null, null, null) - { } - - /// - /// Transport coordinates the client requests over the connection pool nodes and is in charge of falling over on different nodes - /// - /// The connectionsettings to use for this transport - /// In charge of create a new pipeline, safe to pass null to use the default - /// The date time proved to use, safe to pass null to use the default - /// The memory stream provider to use, safe to pass null to use the default - public Transport( - TConnectionSettings configurationValues, - IRequestPipelineFactory pipelineProvider, - IDateTimeProvider dateTimeProvider, - IMemoryStreamFactory memoryStreamFactory - ) - { - configurationValues.ThrowIfNull(nameof(configurationValues)); - configurationValues.ConnectionPool.ThrowIfNull(nameof(configurationValues.ConnectionPool)); - configurationValues.Connection.ThrowIfNull(nameof(configurationValues.Connection)); - configurationValues.Serializer.ThrowIfNull(nameof(configurationValues.Serializer)); - - this.Settings = configurationValues; - this.PipelineProvider = pipelineProvider ?? new RequestPipelineFactory(); - this.DateTimeProvider = dateTimeProvider ?? Net.DateTimeProvider.Default; - this.MemoryStreamFactory = memoryStreamFactory ?? new MemoryStreamFactory(); - this._semaphore = new SemaphoreSlim(1, 1); - } - - public ElasticsearchResponse Request(HttpMethod method, string path, PostData data = null, IRequestParameters requestParameters = null) - where TReturn : class - { - using (var pipeline = this.PipelineProvider.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, requestParameters)) - { - pipeline.FirstPoolUsage(this._semaphore); - - var requestData = new RequestData(method, path, data, this.Settings, requestParameters, this.MemoryStreamFactory); - ElasticsearchResponse response = null; - +namespace Elasticsearch.Net +{ + public class Transport : ITransport + where TConnectionSettings : IConnectionConfigurationValues + { + //TODO should all of these be public? + public TConnectionSettings Settings { get; } + public IDateTimeProvider DateTimeProvider { get; } + public IMemoryStreamFactory MemoryStreamFactory { get; } + public IRequestPipelineFactory PipelineProvider { get; } + + /// + /// Transport coordinates the client requests over the connection pool nodes and is in charge of falling over on different nodes + /// + /// The connectionsettings to use for this transport + public Transport(TConnectionSettings configurationValues) + : this(configurationValues, null, null, null) + { } + + /// + /// Transport coordinates the client requests over the connection pool nodes and is in charge of falling over on different nodes + /// + /// The connectionsettings to use for this transport + /// In charge of create a new pipeline, safe to pass null to use the default + /// The date time proved to use, safe to pass null to use the default + /// The memory stream provider to use, safe to pass null to use the default + public Transport( + TConnectionSettings configurationValues, + IRequestPipelineFactory pipelineProvider, + IDateTimeProvider dateTimeProvider, + IMemoryStreamFactory memoryStreamFactory + ) + { + configurationValues.ThrowIfNull(nameof(configurationValues)); + configurationValues.ConnectionPool.ThrowIfNull(nameof(configurationValues.ConnectionPool)); + configurationValues.Connection.ThrowIfNull(nameof(configurationValues.Connection)); + configurationValues.Serializer.ThrowIfNull(nameof(configurationValues.Serializer)); + + this.Settings = configurationValues; + this.PipelineProvider = pipelineProvider ?? new RequestPipelineFactory(); + this.DateTimeProvider = dateTimeProvider ?? Net.DateTimeProvider.Default; + this.MemoryStreamFactory = memoryStreamFactory ?? new MemoryStreamFactory(); + } + + public ElasticsearchResponse Request(HttpMethod method, string path, PostData data = null, IRequestParameters requestParameters = null) + where TReturn : class + { + using (var pipeline = this.PipelineProvider.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, requestParameters)) + { + pipeline.FirstPoolUsage(this.Settings.BootstrapLock); + + var requestData = new RequestData(method, path, data, this.Settings, requestParameters, this.MemoryStreamFactory); + ElasticsearchResponse response = null; + var seenExceptions = new List(); - foreach (var node in pipeline.NextNode()) + foreach (var node in pipeline.NextNode()) { - requestData.Node = node; - try - { - pipeline.SniffOnStaleCluster(); - Ping(pipeline, node, seenExceptions); - response = pipeline.CallElasticsearch(requestData); + requestData.Node = node; + try + { + pipeline.SniffOnStaleCluster(); + Ping(pipeline, node, seenExceptions); + response = pipeline.CallElasticsearch(requestData); if (!response.SuccessOrKnownError) - { - pipeline.MarkDead(node); + { + pipeline.MarkDead(node); pipeline.SniffOnConnectionFailure(); - } - } + } + } catch (PipelineException pipelineException) when (!pipelineException.Recoverable) - { - pipeline.MarkDead(node); + { + pipeline.MarkDead(node); seenExceptions.Add(pipelineException); break; - } + } catch (PipelineException pipelineException) - { + { pipeline.MarkDead(node); - seenExceptions.Add(pipelineException); + seenExceptions.Add(pipelineException); } catch (Exception killerException) { @@ -95,55 +92,55 @@ public ElasticsearchResponse Request(HttpMethod method, string AuditTrail = pipeline?.AuditTrail }; } - if (response != null && response.SuccessOrKnownError) - { - pipeline.MarkAlive(node); + if (response != null && response.SuccessOrKnownError) + { + pipeline.MarkAlive(node); break; - } - } + } + } if (response == null || !response.Success) pipeline.BadResponse(ref response, requestData, seenExceptions); - return response; - } - } - - public async Task> RequestAsync(HttpMethod method, string path, PostData data = null, IRequestParameters requestParameters = null) - where TReturn : class - { - using (var pipeline = this.PipelineProvider.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, requestParameters)) - { - await pipeline.FirstPoolUsageAsync(this._semaphore); - - var requestData = new RequestData(method, path, data, this.Settings, requestParameters, this.MemoryStreamFactory); - ElasticsearchResponse response = null; - + return response; + } + } + + public async Task> RequestAsync(HttpMethod method, string path, PostData data = null, IRequestParameters requestParameters = null) + where TReturn : class + { + using (var pipeline = this.PipelineProvider.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, requestParameters)) + { + await pipeline.FirstPoolUsageAsync(this.Settings.BootstrapLock); + + var requestData = new RequestData(method, path, data, this.Settings, requestParameters, this.MemoryStreamFactory); + ElasticsearchResponse response = null; + var seenExceptions = new List(); - foreach (var node in pipeline.NextNode()) - { - requestData.Node = node; - try - { - await pipeline.SniffOnStaleClusterAsync(); + foreach (var node in pipeline.NextNode()) + { + requestData.Node = node; + try + { + await pipeline.SniffOnStaleClusterAsync(); await PingAsync(pipeline, node, seenExceptions); - response = await pipeline.CallElasticsearchAsync(requestData); + response = await pipeline.CallElasticsearchAsync(requestData); if (!response.SuccessOrKnownError) { pipeline.MarkDead(node); await pipeline.SniffOnConnectionFailureAsync(); - } - } + } + } catch (PipelineException pipelineException) when (!pipelineException.Recoverable) - { - pipeline.MarkDead(node); - seenExceptions.Add(pipelineException); + { + pipeline.MarkDead(node); + seenExceptions.Add(pipelineException); break; - } + } catch (PipelineException pipelineException) - { - pipeline.MarkDead(node); - seenExceptions.Add(pipelineException); - } - catch (Exception killerException) + { + pipeline.MarkDead(node); + seenExceptions.Add(pipelineException); + } + catch (Exception killerException) { throw new UnexpectedElasticsearchClientException(killerException, seenExceptions) { @@ -151,18 +148,18 @@ public async Task> RequestAsync(HttpMeth Response = response, AuditTrail = pipeline.AuditTrail }; - } - if (response != null && response.SuccessOrKnownError) - { - pipeline.MarkAlive(node); + } + if (response != null && response.SuccessOrKnownError) + { + pipeline.MarkAlive(node); break; - } - } + } + } if (response == null || !response.Success) pipeline.BadResponse(ref response, requestData, seenExceptions); - return response; - } - } + return response; + } + } private static void Ping(IRequestPipeline pipeline, Node node, List seenExceptions) { @@ -189,6 +186,6 @@ private static async Task PingAsync(IRequestPipeline pipeline, Node node, List

() where T : class - { - return this.IndexName(typeof(T)); - } - - public string IndexName(Type type) - { - return this.IndexNameResolver.GetIndexForType(type); - } - - public string IndexName(IndexName index) - { - if (index == null) - return null; - return index.Resolve(this._connectionSettings); - } - - public string IndexNames(params IndexName[] indices) - { - if (indices == null) return null; - return string.Join(",", indices.Select(i => this.IndexNameResolver.GetIndexForType(i))); - } - - public string IndexNames(IEnumerable indices) - { - return !indices.HasAny() ? null : this.IndexNames(indices.ToArray()); - } - - public string Id(T obj) where T : class - { - if (obj == null) return null; - - return this.IdResolver.GetIdFor(obj); - } - - public string Id(Type objType, object obj) - { - if (obj == null) return null; - - return this.IdResolver.GetIdFor(objType, obj); - } - public string TypeName() where T : class - { - return this.TypeName(typeof(T)); - } - public string TypeName(Type t) - { - return t == null ? null : this.TypeNameResolver.GetTypeNameFor(t); - } - - public string TypeNames(params TypeName[] typeNames) - { - return typeNames == null - ? null - : string.Join(",", typeNames.Select(t => this.TypeNameResolver.GetTypeNameFor(t))); - } - - public string TypeNames(IEnumerable typeNames) - { - return !typeNames.HasAny() ? null : this.TypeNames(typeNames.ToArray()); - } - - public string TypeName(TypeName type) - { - return type == null ? null : this.TypeNameResolver.GetTypeNameFor(type); - } - } -} +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; + +namespace Nest +{ + public class ElasticInferrer + { + private readonly IConnectionSettingsValues _connectionSettings; + + private IdResolver IdResolver { get; set; } + private IndexNameResolver IndexNameResolver { get; set; } + private TypeNameResolver TypeNameResolver { get; set; } + private FieldResolver FieldResolver { get; set; } + + public string DefaultIndex + { + get + { + var index = (this._connectionSettings == null) ? string.Empty : this._connectionSettings.DefaultIndex; + return index.IsNullOrEmpty() ? "_all" : index; + } + } + + public ElasticInferrer(IConnectionSettingsValues connectionSettings) + { + this._connectionSettings = connectionSettings; + this.IdResolver = new IdResolver(this._connectionSettings); + this.IndexNameResolver = new IndexNameResolver(this._connectionSettings); + this.TypeNameResolver = new TypeNameResolver(this._connectionSettings); + this.FieldResolver = new FieldResolver(this._connectionSettings); + } + + public string Field(Field field) + { + if (field.IsConditionless()) + return null; + + var name = !field.Name.IsNullOrEmpty() + ? field.Name + : field.Expression != null + ? this.FieldResolver.Resolve(field.Expression) + : field.Property != null + ? this.FieldResolver.Resolve(field.Property) + : null; + + if (name == null) + throw new ArgumentException("Could not resolve a field name"); + + if (field != null && field.Boost.HasValue) + name += "^" + field.Boost.Value.ToString(CultureInfo.InvariantCulture); + + return name; + } + + public string PropertyName(PropertyName property) + { + if (property.IsConditionless()) + return null; + + var name = !property.Name.IsNullOrEmpty() + ? property.Name + : property.Expression != null + ? this.FieldResolver.Resolve(property.Expression) + : property.Property != null + ? this.FieldResolver.Resolve(property.Property) + : null; + + if (name == null) + throw new ArgumentException("Could not resolve a property name"); + + return name; + } + + public string IndexName() where T : class + { + return this.IndexName(typeof(T)); + } + + public string IndexName(Type type) + { + return this.IndexNameResolver.GetIndexForType(type); + } + + public string IndexName(IndexName index) + { + if (index == null) + return null; + return index.Resolve(this._connectionSettings); + } + + public string IndexNames(params IndexName[] indices) + { + if (indices == null) return null; + return string.Join(",", indices.Select(i => this.IndexNameResolver.GetIndexForType(i))); + } + + public string IndexNames(IEnumerable indices) + { + return !indices.HasAny() ? null : this.IndexNames(indices.ToArray()); + } + + public string Id(T obj) where T : class + { + if (obj == null) return null; + + return this.IdResolver.GetIdFor(obj); + } + + public string Id(Type objType, object obj) + { + if (obj == null) return null; + + return this.IdResolver.GetIdFor(objType, obj); + } + + public string TypeName() where T : class + { + return this.TypeName(typeof(T)); + } + public string TypeName(Type t) + { + return t == null ? null : this.TypeNameResolver.GetTypeNameFor(t); + } + + public string TypeNames(params TypeName[] typeNames) + { + return typeNames == null + ? null + : string.Join(",", typeNames.Select(t => this.TypeNameResolver.GetTypeNameFor(t))); + } + + public string TypeNames(IEnumerable typeNames) + { + return !typeNames.HasAny() ? null : this.TypeNames(typeNames.ToArray()); + } + + public string TypeName(TypeName type) + { + return type == null ? null : this.TypeNameResolver.GetTypeNameFor(type); + } + } +} diff --git a/src/Nest/CommonAbstractions/Infer/IndexName/IndexNameResolver.cs b/src/Nest/CommonAbstractions/Infer/IndexName/IndexNameResolver.cs index 7ac8f0da131..13a96e13707 100644 --- a/src/Nest/CommonAbstractions/Infer/IndexName/IndexNameResolver.cs +++ b/src/Nest/CommonAbstractions/Infer/IndexName/IndexNameResolver.cs @@ -12,11 +12,6 @@ public IndexNameResolver(IConnectionSettingsValues connectionSettings) this._connectionSettings = connectionSettings; } - public string GetIndexForType() - { - return this.GetIndexForType(typeof(T)); - } - public string GetIndexForType(Type type) { var defaultIndices = this._connectionSettings.DefaultIndices; diff --git a/src/Nest/ElasticClient.cs b/src/Nest/ElasticClient.cs index 2a3b7e2ce08..b6a9593a7d7 100644 --- a/src/Nest/ElasticClient.cs +++ b/src/Nest/ElasticClient.cs @@ -97,5 +97,6 @@ private TRequest ForceConfiguration(TRequest request, Action< request.RequestParameters.RequestConfiguration = configuration; return request; } + } } diff --git a/src/Nest/IElasticClient.cs b/src/Nest/IElasticClient.cs index 66e5a626b40..8cd69dc814d 100644 --- a/src/Nest/IElasticClient.cs +++ b/src/Nest/IElasticClient.cs @@ -1,13 +1,13 @@ -using Elasticsearch.Net; +using System; +using Elasticsearch.Net; namespace Nest { - public partial interface IElasticClient + public partial interface IElasticClient { IConnectionSettingsValues ConnectionSettings { get; } IElasticsearchSerializer Serializer { get; } IElasticsearchClient Raw { get; } ElasticInferrer Infer { get; } - } } diff --git a/src/Tests/ClientConcepts/ConnectionPooling/Sniffing/OnStartup.doc.cs b/src/Tests/ClientConcepts/ConnectionPooling/Sniffing/OnStartup.doc.cs index 321b8ebd0fa..78bb3234ed2 100644 --- a/src/Tests/ClientConcepts/ConnectionPooling/Sniffing/OnStartup.doc.cs +++ b/src/Tests/ClientConcepts/ConnectionPooling/Sniffing/OnStartup.doc.cs @@ -37,6 +37,35 @@ await audit.TraceCall(new ClientCall }); } + [U] [SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")] + public async Task ASniffOnStartupHappensOnce() + { + var audit = new Auditor(() => Framework.Cluster + .Nodes(10) + .Sniff(s => s.Fails(Always)) + .Sniff(s => s.OnPort(9202).Succeeds(Always)) + .SniffingConnectionPool() + .AllDefaults() + ); + + await audit.TraceCalls( + new ClientCall + { + { SniffOnStartup}, + { SniffFailure, 9200}, + { SniffFailure, 9201}, + { SniffSuccess, 9202}, + { PingSuccess , 9200}, + { HealthyResponse, 9200} + }, + new ClientCall + { + { PingSuccess, 9201}, + { HealthyResponse, 9201} + } + ); + } + [U] [SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")] public async Task SniffOnStartUpTakesNewClusterState() { diff --git a/src/Tests/ClientConcepts/LowLevel/Connecting.doc.cs b/src/Tests/ClientConcepts/LowLevel/Connecting.doc.cs index e5b2ffa1f33..17cd9de0319 100644 --- a/src/Tests/ClientConcepts/LowLevel/Connecting.doc.cs +++ b/src/Tests/ClientConcepts/LowLevel/Connecting.doc.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Collections.Specialized; using System.Net; +using System.Threading.Tasks; using Elasticsearch.Net; using FluentAssertions; using Nest; @@ -197,18 +198,37 @@ public void ConfiguringSSL() */ } + /** + * ## Overriding default Json.NET behavior + * + * Please be advised that this is an expert behavior but if you need to get to the nitty gritty this can be really useful + * + * Create a subclass of the `JsonNetSerializer` + + */ public class MyJsonNetSerializer : JsonNetSerializer { - public int X { get; set; } = 0; public MyJsonNetSerializer(IConnectionSettingsValues settings) : base(settings) { } + - protected override void ModifyJsonSerializerSettings(JsonSerializerSettings settings) - { - ++X; - } - } + /** + * Override ModifyJsonSerializerSettings if you need access to `JsonSerializerSettings` + */ + public int X { get; set; } = 0; + protected override void ModifyJsonSerializerSettings(JsonSerializerSettings settings) => ++X; - [U] public void IsCalled() + /** + * You can inject contract resolved converters by implementing the ContractConverters property + * This can be much faster then registering them on JsonSerializerSettings.Converters + */ + protected override IList> ContractConverters => new List>(); + } + + /** + * You can then register a factory on ConnectionSettings to create an instance of your subclass instead. + * This is called once per instance of ConnectionSettings. + */ + [U] public void ModifyJsonSerializerSettingsIsCalled() { var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); var settings = new ConnectionSettings(connectionPool, new InMemoryConnection(),s => new MyJsonNetSerializer(s)); diff --git a/src/Tests/ClientConcepts/LowLevel/Lifetimes.doc.cs b/src/Tests/ClientConcepts/LowLevel/Lifetimes.doc.cs new file mode 100644 index 00000000000..f6fcadfc92e --- /dev/null +++ b/src/Tests/ClientConcepts/LowLevel/Lifetimes.doc.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Generic; +using System.Collections.Specialized; +using System.Net; +using System.Threading.Tasks; +using Elasticsearch.Net; +using FluentAssertions; +using Nest; +using Newtonsoft.Json; +using Tests.Framework; + +namespace Tests.ClientConcepts.LowLevel +{ + public class Lifetimes + { + /** + * ## Lifetimes + * + * If you are using an IOC container its always useful to know the best practices around the lifetime of your objects + + * In general we advise folks to register their ElasticClient instances as singleton. The client is thread safe + * so sharing this instance over threads is ok. + + * Zooming in however the actual moving part that benefits the most of being static for most of the duration of your + * application is ConnectionSettings. Caches are per ConnectionSettings. + + * In some applications it could make perfect sense to have multiple singleton IElasticClient's registered with different + * connectionsettings. e.g if you have 2 functionally isolated Elasticsearch clusters. + + */ + + + [U] public void InitialDisposeState() + { + var connection = new AConnection(); + var connectionPool = new AConnectionPool(new Uri("http://localhost:9200")); + var settings = new AConnectionSettings(connectionPool, connection); + settings.IsDisposed.Should().BeFalse(); + connectionPool.IsDisposed.Should().BeFalse(); + connection.IsDisposed.Should().BeFalse(); + } + + /** + * Disposing the ConnectionSettings will dispose the IConnectionPool and IConnection it has a hold of + */ + + [U] public void DisposingSettingsDisposesMovingParts() + { + var connection = new AConnection(); + var connectionPool = new AConnectionPool(new Uri("http://localhost:9200")); + var settings = new AConnectionSettings(connectionPool, connection); + using (settings) { } + settings.IsDisposed.Should().BeTrue(); + connectionPool.IsDisposed.Should().BeTrue(); + connection.IsDisposed.Should().BeTrue(); + } + + class AConnectionPool : SingleNodeConnectionPool + { + public AConnectionPool(Uri uri, IDateTimeProvider dateTimeProvider = null) : base(uri, dateTimeProvider) { } + + public bool IsDisposed { get; private set; } + protected override void DisposeManagedResources() + { + this.IsDisposed = true; + base.DisposeManagedResources(); + } + } + + class AConnectionSettings : ConnectionSettings + { + public AConnectionSettings(IConnectionPool pool, IConnection connection) + : base(pool, connection) { } + public bool IsDisposed { get; private set; } + protected override void DisposeManagedResources() + { + this.IsDisposed = true; + base.DisposeManagedResources(); + } + } + + class AConnection : InMemoryConnection + { + public bool IsDisposed { get; private set; } + protected override void DisposeManagedResources() + { + this.IsDisposed = true; + base.DisposeManagedResources(); + } + } + + } +} diff --git a/src/Tests/Document/Single/Get/GetUrlTests.cs b/src/Tests/Document/Single/Get/GetUrlTests.cs index cbe31af732f..b3a7e152ca3 100644 --- a/src/Tests/Document/Single/Get/GetUrlTests.cs +++ b/src/Tests/Document/Single/Get/GetUrlTests.cs @@ -3,12 +3,14 @@ using Tests.Framework; using Tests.Framework.MockData; using static Tests.Framework.UrlTester; +using static Nest.Infer; namespace Tests.Document.Single.Get { public class GetUrlTests { - [U] public async Task Urls() + [U] + public async Task Urls() { await GET("/project/project/1") .Fluent(c => c.Get(1)) @@ -16,6 +18,14 @@ await GET("/project/project/1") .FluentAsync(c => c.GetAsync(1)) .RequestAsync(c => c.GetAsync(new GetRequest(1))) ; + await GET("/testindex/typeindex/1") + .Fluent(c => c.Get(1, g => g.Index("testindex").Type("typeindex"))) + .Request(c => c.Get(new GetRequest(new DocumentPath(1).Index("testindex").Type("typeindex")))) + .Request(c => c.Get(new GetRequest("testindex", "typeindex", 1))) + .FluentAsync(c => c.GetAsync(1, g => g.Index("testindex").Type("typeindex"))) + .RequestAsync(c => c.GetAsync(new GetRequest(new DocumentPath(1).Index("testindex").Type("typeindex")))) + .RequestAsync(c => c.GetAsync(new GetRequest("testindex", "typeindex", 1))) + ; } } } diff --git a/src/Tests/Framework/VirtualClustering/FixedPipelineFactory.cs b/src/Tests/Framework/VirtualClustering/FixedPipelineFactory.cs index 2fe912385b2..6d4bbd073ad 100644 --- a/src/Tests/Framework/VirtualClustering/FixedPipelineFactory.cs +++ b/src/Tests/Framework/VirtualClustering/FixedPipelineFactory.cs @@ -5,12 +5,14 @@ namespace Tests.Framework { public class FixedPipelineFactory : IRequestPipelineFactory { - public IConnectionSettingsValues Settings { get; } - public Transport Transport { get; } - public IRequestPipeline Pipeline { get; } + private IConnectionSettingsValues Settings { get; } + private Transport Transport => + new Transport(this.Settings, this, this.DateTimeProvider, this.MemoryStreamFactory); + + private IDateTimeProvider DateTimeProvider { get; } + private MemoryStreamFactory MemoryStreamFactory { get; } - public IDateTimeProvider DateTimeProvider { get; } - public MemoryStreamFactory MemoryStreamFactory { get; } + public IRequestPipeline Pipeline { get; } public ElasticClient Client => new ElasticClient(this.Transport); @@ -21,7 +23,6 @@ public FixedPipelineFactory(IConnectionSettingsValues connectionSettings, IDateT this.Settings = connectionSettings; this.Pipeline = this.Create(this.Settings, this.DateTimeProvider, this.MemoryStreamFactory, new SearchRequestParameters()); - this.Transport = new Transport(this.Settings, this, this.DateTimeProvider, this.MemoryStreamFactory); } public IRequestPipeline Create(IConnectionConfigurationValues configurationValues, IDateTimeProvider dateTimeProvider, IMemoryStreamFactory memorystreamFactory, IRequestParameters requestParameters) => diff --git a/src/Tests/Framework/VirtualClustering/VirtualizedCluster.cs b/src/Tests/Framework/VirtualClustering/VirtualizedCluster.cs index b9d3c481b78..6d95f7f4352 100644 --- a/src/Tests/Framework/VirtualClustering/VirtualizedCluster.cs +++ b/src/Tests/Framework/VirtualClustering/VirtualizedCluster.cs @@ -1,42 +1,41 @@ -using System; -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using Elasticsearch.Net; using Nest; -using Tests.Framework.MockData; - -namespace Tests.Framework -{ - public class VirtualizedCluster - { - private readonly ElasticClient _client; - private readonly VirtualCluster _cluster; - private readonly IConnectionPool _connectionPool; - private readonly TestableDateTimeProvider _dateTimeProvider; - private readonly ConnectionSettings _settings; - public FixedPipelineFactory _fixedRequestPipeline; - - public IConnectionPool ConnectionPool => this._client.ConnectionSettings.ConnectionPool; - - public VirtualizedCluster(VirtualCluster cluster, IConnectionPool pool, TestableDateTimeProvider dateTimeProvider, ConnectionSettings settings) - { - this._dateTimeProvider = dateTimeProvider; - _settings = settings; - this._fixedRequestPipeline = new FixedPipelineFactory(settings, this._dateTimeProvider); - this._client = this._fixedRequestPipeline.Client; - - this._cluster = cluster; - this._connectionPool = pool; - } - - public ISearchResponse ClientCall(Func requestOverrides = null) - => this._client.Search(s => s.RequestConfiguration(requestOverrides)); - - public async Task> ClientCallAsync(Func requestOverrides = null) => - await this._client.SearchAsync(s => s.RequestConfiguration(requestOverrides)); - - public void ChangeTime(Func change) => _dateTimeProvider.ChangeTime(change); - - public void ClientThrows(bool throws) => _settings.ThrowExceptions(throws); - } - +using Tests.Framework.MockData; + +namespace Tests.Framework +{ + public class VirtualizedCluster + { + private ElasticClient Client => this._fixedRequestPipeline?.Client; + private readonly VirtualCluster _cluster; + private readonly IConnectionPool _connectionPool; + private readonly TestableDateTimeProvider _dateTimeProvider; + private readonly ConnectionSettings _settings; + public FixedPipelineFactory _fixedRequestPipeline; + + public IConnectionPool ConnectionPool => this.Client.ConnectionSettings.ConnectionPool; + + public VirtualizedCluster(VirtualCluster cluster, IConnectionPool pool, TestableDateTimeProvider dateTimeProvider, ConnectionSettings settings) + { + this._dateTimeProvider = dateTimeProvider; + this._settings = settings; + this._fixedRequestPipeline = new FixedPipelineFactory(settings, this._dateTimeProvider); + + this._cluster = cluster; + this._connectionPool = pool; + } + + public ISearchResponse ClientCall(Func requestOverrides = null) => + this.Client.Search(s => s.RequestConfiguration(requestOverrides)); + + public async Task> ClientCallAsync(Func requestOverrides = null) => + await this.Client.SearchAsync(s => s.RequestConfiguration(requestOverrides)); + + public void ChangeTime(Func change) => _dateTimeProvider.ChangeTime(change); + + public void ClientThrows(bool throws) => _settings.ThrowExceptions(throws); + } + } \ No newline at end of file diff --git a/src/Tests/QueryDsl/BoolDsl/BoolApiTests.cs b/src/Tests/QueryDsl/BoolDsl/BoolApiTests.cs index 15028a3bc40..7e505079c99 100644 --- a/src/Tests/QueryDsl/BoolDsl/BoolApiTests.cs +++ b/src/Tests/QueryDsl/BoolDsl/BoolApiTests.cs @@ -64,7 +64,6 @@ public class BoolsInPractice { private readonly BoolCluster _cluster; - private IConnectionSettingsValues _settings; public BoolsInPractice(BoolCluster cluster) { diff --git a/src/Tests/Tests.csproj b/src/Tests/Tests.csproj index 630ee3bf667..9a3ae546073 100644 --- a/src/Tests/Tests.csproj +++ b/src/Tests/Tests.csproj @@ -149,6 +149,7 @@ +