Skip to content

Serverless #7966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
23 changes: 23 additions & 0 deletions Elasticsearch.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests.ClusterLauncher", "te
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Elastic.Clients.Elasticsearch.JsonNetSerializer", "src\Elastic.Clients.Elasticsearch.JsonNetSerializer\Elastic.Clients.Elasticsearch.JsonNetSerializer.csproj", "{8C9275D9-29CE-4A20-8FD5-6B26C6CAAADB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Elastic.Clients.Elasticsearch.Serverless", "src\Elastic.Clients.Elasticsearch.Serverless\Elastic.Clients.Elasticsearch.Serverless.csproj", "{49D7F5A7-AA32-492B-B957-0E3325861F55}"
EndProject
Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "Elastic.Clients.Elasticsearch.Shared", "src\Elastic.Clients.Elasticsearch.Shared\Elastic.Clients.Elasticsearch.Shared.shproj", "{A90DD7B8-8AFB-4BE9-AA16-B159A880E79D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -209,6 +213,18 @@ Global
{8C9275D9-29CE-4A20-8FD5-6B26C6CAAADB}.Release|x64.Build.0 = Release|Any CPU
{8C9275D9-29CE-4A20-8FD5-6B26C6CAAADB}.Release|x86.ActiveCfg = Release|Any CPU
{8C9275D9-29CE-4A20-8FD5-6B26C6CAAADB}.Release|x86.Build.0 = Release|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Debug|Any CPU.Build.0 = Debug|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Debug|x64.ActiveCfg = Debug|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Debug|x64.Build.0 = Debug|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Debug|x86.ActiveCfg = Debug|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Debug|x86.Build.0 = Debug|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Release|Any CPU.ActiveCfg = Release|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Release|Any CPU.Build.0 = Release|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Release|x64.ActiveCfg = Release|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Release|x64.Build.0 = Release|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Release|x86.ActiveCfg = Release|Any CPU
{49D7F5A7-AA32-492B-B957-0E3325861F55}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -226,8 +242,15 @@ Global
{68D1BFDC-F447-4D2C-AF81-537807636610} = {1FE49D14-216A-41EE-A177-E42BFF53E0DC}
{F6162603-D134-4121-8106-2BA4DAD7350B} = {362B2776-4B29-46AB-B237-56776B5372B6}
{8C9275D9-29CE-4A20-8FD5-6B26C6CAAADB} = {D455EC79-E1E0-4509-B297-0DA3AED8DFF7}
{49D7F5A7-AA32-492B-B957-0E3325861F55} = {D455EC79-E1E0-4509-B297-0DA3AED8DFF7}
{A90DD7B8-8AFB-4BE9-AA16-B159A880E79D} = {D455EC79-E1E0-4509-B297-0DA3AED8DFF7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CE74F821-B001-4C69-A58D-CF81F8B0B632}
EndGlobalSection
GlobalSection(SharedMSBuildProjectFiles) = preSolution
src\Elastic.Clients.Elasticsearch.Shared\Elastic.Clients.Elasticsearch.Shared.projitems*{49d7f5a7-aa32-492b-b957-0e3325861f55}*SharedItemsImports = 5
src\Elastic.Clients.Elasticsearch.Shared\Elastic.Clients.Elasticsearch.Shared.projitems*{a90dd7b8-8afb-4be9-aa16-b159a880e79d}*SharedItemsImports = 13
src\Elastic.Clients.Elasticsearch.Shared\Elastic.Clients.Elasticsearch.Shared.projitems*{f8a7e60c-0c48-4d76-af7f-7881df5a263d}*SharedItemsImports = 5
EndGlobalSection
EndGlobal
1 change: 1 addition & 0 deletions release.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dotnet pack -c Release -o build/output /p:CurrentVersion=1.0.0-preview.1+20231031;CurrentAssemblyVersion=1.0.0;CurrentAssemblyFileVersion=1.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Clients.Elasticsearch.Serverless.Requests;
using Elastic.Transport;
using Elastic.Transport.Diagnostics;
using Elastic.Transport.Products.Elasticsearch;

namespace Elastic.Clients.Elasticsearch.Serverless;

/// <summary>
/// A strongly-typed client for communicating with Elasticsearch server endpoints.
/// </summary>
public partial class ElasticsearchClient
{
private const string OpenTelemetrySpanAttributePrefix = "db.elasticsearch.";
// This should be updated if any of the code uses semantic conventions defined in newer schema versions.
private const string OpenTelemetrySchemaVersion = "https://opentelemetry.io/schemas/1.21.0";

private readonly HttpTransport<IElasticsearchClientSettings> _transport;
internal static ConditionalWeakTable<JsonSerializerOptions, IElasticsearchClientSettings> SettingsTable { get; } = new();

/// <summary>
/// Creates a client configured to connect to http://localhost:9200.
/// </summary>
public ElasticsearchClient() : this(new ElasticsearchClientSettings(new Uri("http://localhost:9200"))) { }

/// <summary>
/// Creates a client configured to connect to a node reachable at the provided <paramref name="uri" />.
/// </summary>
/// <param name="uri">The <see cref="Uri" /> to connect to.</param>
public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri)) { }

/// <summary>
/// Creates a client configured to communicate with Elastic Cloud using the provided <paramref name="cloudId" />.
/// <para>See the <see cref="CloudNodePool" /> documentation for more information on how to obtain your Cloud Id.</para>
/// <para>
/// If you want more control, use the <see cref="ElasticsearchClient(IElasticsearchClientSettings)" /> constructor and
/// pass an instance of <see cref="ElasticsearchClientSettings" /> that takes a <paramref name="cloudId" /> in its constructor as well.
/// </para>
/// </summary>
/// <param name="cloudId">The Cloud ID of an Elastic Cloud deployment.</param>
/// <param name="credentials">The credentials to use for the connection.</param>
public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) : this(
new ElasticsearchClientSettings(cloudId, credentials))
{
}

/// <summary>
/// Creates a client using the provided configuration to initialise the client.
/// </summary>
/// <param name="elasticsearchClientSettings">The <see cref="IElasticsearchClientSettings"/> used to configure the client.</param>
public ElasticsearchClient(IElasticsearchClientSettings elasticsearchClientSettings)
: this(new DefaultHttpTransport<IElasticsearchClientSettings>(elasticsearchClientSettings))
{
}

internal ElasticsearchClient(HttpTransport<IElasticsearchClientSettings> transport)
{
transport.ThrowIfNull(nameof(transport));
transport.Settings.ThrowIfNull(nameof(transport.Settings));
transport.Settings.RequestResponseSerializer.ThrowIfNull(
nameof(transport.Settings.RequestResponseSerializer));
transport.Settings.Inferrer.ThrowIfNull(nameof(transport.Settings.Inferrer));

_transport = transport;

SetupNamespaces();
}

public IElasticsearchClientSettings ElasticsearchClientSettings => _transport.Settings;
public Inferrer Infer => _transport.Settings.Inferrer;
public Serializer RequestResponseSerializer => _transport.Settings.RequestResponseSerializer;
public Serializer SourceSerializer => _transport.Settings.SourceSerializer;
public HttpTransport Transport => _transport;

private ProductCheckStatus _productCheckStatus;

private enum ProductCheckStatus
{
NotChecked,
Succeeded,
Failed
}

private partial void SetupNamespaces();

internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(TRequest request)
where TRequest : Request<TRequestParameters>
where TResponse : ElasticsearchResponse, new()
where TRequestParameters : RequestParameters, new() =>
DoRequest<TRequest, TResponse, TRequestParameters>(request, null);

internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(
TRequest request,
Action<IRequestConfiguration>? forceConfiguration)
where TRequest : Request<TRequestParameters>
where TResponse : ElasticsearchResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(false, request, forceConfiguration).EnsureCompleted();

internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
TRequest request,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : ElasticsearchResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, null, cancellationToken);

internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
TRequest request,
Action<IRequestConfiguration>? forceConfiguration,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : ElasticsearchResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(true, request, forceConfiguration, cancellationToken).AsTask();

private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(
bool isAsync,
TRequest request,
Action<IRequestConfiguration>? forceConfiguration,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : ElasticsearchResponse, new()
where TRequestParameters : RequestParameters, new()
{
if (_productCheckStatus == ProductCheckStatus.Failed)
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);

var (requestModified, hadRequestConfig, originalHeaders) = AttachProductCheckHeaderIfRequired<TRequest, TRequestParameters>(request);
var (resolvedUrl, urlTemplate, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request, forceConfiguration);
var openTelemetryData = PrepareOpenTelemetryData<TRequest, TRequestParameters>(request, resolvedRouteValues);

if (_productCheckStatus == ProductCheckStatus.Succeeded && !requestModified)
{
if (isAsync)
return new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken));
else
return new ValueTask<TResponse>(_transport.Request<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData));
}

return SendRequest(isAsync);

async ValueTask<TResponse> SendRequest(bool isAsync)
{
TResponse response;

if (isAsync)
response = await _transport.RequestAsync<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken).ConfigureAwait(false);
else
response = _transport.Request<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData);

PostRequestProductCheck<TRequest, TResponse>(request, response);

if (_productCheckStatus == ProductCheckStatus.Failed)
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);

if (request.RequestParameters.RequestConfiguration is not null)
{
if (!hadRequestConfig)
{
request.RequestParameters.RequestConfiguration = null;
}
else if (originalHeaders.HasValue && originalHeaders.Value.Count > 0)
{
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = originalHeaders.Value;
}
}

return response;
}
}

private static OpenTelemetryData PrepareOpenTelemetryData<TRequest, TRequestParameters>(TRequest request, Dictionary<string, string> resolvedRouteValues)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
// If there are no subscribed listeners, we avoid some work and allocations
if (!Elastic.Transport.Diagnostics.OpenTelemetry.ElasticTransportActivitySourceHasListeners)
return default;

// We fall back to a general operation name in cases where the derived request fails to override the property
var operationName = !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : request.HttpMethod.GetStringValue();

// TODO: Optimisation: We should consider caching these, either for cases where resolvedRouteValues is null, or
// caching per combination of route values.
// We should benchmark this first to assess the impact for common workloads.
// The former is likely going to save some short-lived allocations, but only for requests to endpoints without required path parts.
// The latter may bloat the cache as some combinations of path parts may rarely re-occur.
var attributes = new Dictionary<string, object>
{
[OpenTelemetry.SemanticConventions.DbOperation] = !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : "unknown",
[$"{OpenTelemetrySpanAttributePrefix}schema_url"] = OpenTelemetrySchemaVersion
};

if (resolvedRouteValues is not null)
{
foreach (var value in resolvedRouteValues)
{
if (!string.IsNullOrEmpty(value.Key) && !string.IsNullOrEmpty(value.Value))
attributes.Add($"{OpenTelemetrySpanAttributePrefix}path_parts.{value.Key}", value.Value);
}
}

var openTelemetryData = new OpenTelemetryData { SpanName = operationName, SpanAttributes = attributes };
return openTelemetryData;
}

private (bool requestModified, bool hadRequestConfig, HeadersList? originalHeaders) AttachProductCheckHeaderIfRequired<TRequest, TRequestParameters>(TRequest request)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
var requestModified = false;
var hadRequestConfig = false;
HeadersList? originalHeaders = null;

// If we have not yet checked the product name, add the product header to the list of headers to parse.
if (_productCheckStatus == ProductCheckStatus.NotChecked)
{
requestModified = true;

if (request.RequestParameters.RequestConfiguration is null)
{
request.RequestParameters.RequestConfiguration = new RequestConfiguration();
}
else
{
originalHeaders = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse;
hadRequestConfig = true;
}

if (request.RequestParameters.RequestConfiguration.ResponseHeadersToParse.Count == 0)
{
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = new HeadersList("x-elastic-product");
}
else
{
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = new HeadersList(request.RequestParameters.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product");
}
}

return (requestModified, hadRequestConfig, originalHeaders);
}

private (string resolvedUrl, string urlTemplate, Dictionary<string, string>? resolvedRouteValues, PostData data) PrepareRequest<TRequest, TRequestParameters>(TRequest request,
Action<IRequestConfiguration>? forceConfiguration)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
request.ThrowIfNull(nameof(request), "A request is required.");

if (forceConfiguration is not null)
ForceConfiguration(request, forceConfiguration);

if (request.ContentType is not null)
ForceContentType<TRequest, TRequestParameters>(request, request.ContentType);

if (request.Accept is not null)
ForceAccept<TRequest, TRequestParameters>(request, request.Accept);

var (resolvedUrl, urlTemplate, routeValues) = request.GetUrl(ElasticsearchClientSettings);

var postData =
request.HttpMethod == HttpMethod.GET ||
request.HttpMethod == HttpMethod.HEAD || !request.SupportsBody
? null
: PostData.Serializable(request);

return (resolvedUrl, urlTemplate, routeValues, postData);
}

private void PostRequestProductCheck<TRequest, TResponse>(TRequest request, TResponse response)
where TRequest : Request
where TResponse : ElasticsearchResponse, new()
{
if (response.ApiCallDetails.HttpStatusCode.HasValue && response.ApiCallDetails.HttpStatusCode.Value >= 200 && response.ApiCallDetails.HttpStatusCode.Value <= 299 && _productCheckStatus == ProductCheckStatus.NotChecked)
{
if (!response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) || !values.Single().Equals("Elasticsearch", StringComparison.Ordinal))
{
_productCheckStatus = ProductCheckStatus.Failed;
}

_productCheckStatus = ProductCheckStatus.Succeeded;
}
}

private static void ForceConfiguration<TRequestParameters>(Request<TRequestParameters> request, Action<IRequestConfiguration> forceConfiguration)
where TRequestParameters : RequestParameters, new()
{
var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration();
forceConfiguration(configuration);
request.RequestParameters.RequestConfiguration = configuration;
}

private static void ForceContentType<TRequest, TRequestParameters>(TRequest request, string contentType)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration();
configuration.Accept = contentType;
configuration.ContentType = contentType;
request.RequestParameters.RequestConfiguration = configuration;
}

private static void ForceAccept<TRequest, TRequestParameters>(TRequest request, string acceptType)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration();
configuration.Accept = acceptType;
request.RequestParameters.RequestConfiguration = configuration;
}

internal static void ForceJson(IRequestConfiguration requestConfiguration)
{
requestConfiguration.Accept = RequestData.DefaultMimeType;
requestConfiguration.ContentType = RequestData.DefaultMimeType;
}

internal static void ForceTextPlain(IRequestConfiguration requestConfiguration)
{
requestConfiguration.Accept = RequestData.MimeTypeTextPlain;
requestConfiguration.ContentType = RequestData.MimeTypeTextPlain;
}
}
Loading