Skip to content

Rework ProcessScheduler #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<PackageReference Update="coverlet.collector" Version="1.2.0" />
<PackageReference Update="coverlet.msbuild" Version="2.8.0" />
<PackageReference Update="System.Reactive" Version="4.3.2" />
<PackageReference Update="Microsoft.Reactive.Testing" Version="4.3.2" />
<PackageReference Update="MediatR" Version="8.0.1" />
<PackageReference Update="MediatR.Extensions.Microsoft.DependencyInjection" Version="8.0.0" />
</ItemGroup>
Expand Down
9 changes: 4 additions & 5 deletions src/JsonRpc/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace OmniSharp.Extensions.JsonRpc
public class Connection : IDisposable
{
private readonly IInputHandler _inputHandler;
private readonly IRequestRouter<IHandlerDescriptor> _requestRouter;

public Connection(
Stream input,
Expand All @@ -17,10 +16,9 @@ public Connection(
IRequestRouter<IHandlerDescriptor> requestRouter,
IResponseRouter responseRouter,
ILoggerFactory loggerFactory,
ISerializer serializer)
ISerializer serializer,
int? concurrency)
{
_requestRouter = requestRouter;

_inputHandler = new InputHandler(
input,
outputHandler,
Expand All @@ -29,7 +27,8 @@ public Connection(
requestRouter,
responseRouter,
loggerFactory,
serializer
serializer,
concurrency
);
}

Expand Down
12 changes: 11 additions & 1 deletion src/JsonRpc/IScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace OmniSharp.Extensions.JsonRpc
{
public interface IScheduler : IDisposable
{
void Start();
void Add(RequestProcessType type, string name, Func<Task> request);
void Add(RequestProcessType type, string name, IObservable<Unit> request);
}

public static class SchedulerExtensions
{
public static void Add(this IScheduler scheduler, RequestProcessType type, string name, Func<Task> request)
{
scheduler.Add(type, name, Observable.FromAsync(request));
}
}
}
7 changes: 4 additions & 3 deletions src/JsonRpc/InputHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public InputHandler(
IRequestRouter<IHandlerDescriptor> requestRouter,
IResponseRouter responseRouter,
ILoggerFactory loggerFactory,
ISerializer serializer
ISerializer serializer,
int? concurrency
)
{
if (!input.CanRead) throw new ArgumentException($"must provide a readable stream for {nameof(input)}", nameof(input));
Expand All @@ -49,15 +50,15 @@ ISerializer serializer
_responseRouter = responseRouter;
_serializer = serializer;
_logger = loggerFactory.CreateLogger<InputHandler>();
_scheduler = new ProcessScheduler(loggerFactory);
_scheduler = new ProcessScheduler(loggerFactory, concurrency);
_inputThread = new Thread(ProcessInputStream) { IsBackground = true, Name = "ProcessInputStream" };
}

public void Start()
{
_scheduler.Start();
_outputHandler.Start();
_inputThread.Start();
_scheduler.Start();
}

// don't be async: We already allocated a seperate thread for this.
Expand Down
169 changes: 76 additions & 93 deletions src/JsonRpc/ProcessScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -9,122 +17,97 @@ namespace OmniSharp.Extensions.JsonRpc
{
public class ProcessScheduler : IScheduler
{
private readonly int? _concurrency;
private readonly ILogger<ProcessScheduler> _logger;
private readonly BlockingCollection<(RequestProcessType type, string name, Func<Task> request)> _queue;
private readonly CancellationTokenSource _cancel;
private readonly Thread _thread;
private readonly IObserver<(RequestProcessType type, string name, IObservable<Unit> request)> _enqueue;
private readonly IObservable<(RequestProcessType type, string name, IObservable<Unit> request)> _queue;
private bool _disposed = false;
private readonly CompositeDisposable _disposable = new CompositeDisposable();
private readonly System.Reactive.Concurrency.IScheduler _scheduler;

public ProcessScheduler(ILoggerFactory loggerFactory)
public ProcessScheduler(ILoggerFactory loggerFactory, int? concurrency) : this(loggerFactory, concurrency,
new EventLoopScheduler(
_ => new Thread(_) {IsBackground = true, Name = "ProcessRequestQueue"}))
{
_logger = loggerFactory.CreateLogger<ProcessScheduler>();
_queue = new BlockingCollection<(RequestProcessType type, string name, Func<Task> request)>();
_cancel = new CancellationTokenSource();
_thread = new Thread(ProcessRequestQueue) { IsBackground = true, Name = "ProcessRequestQueue" };
}

public void Start()
internal ProcessScheduler(ILoggerFactory loggerFactory, int? concurrency,
System.Reactive.Concurrency.IScheduler scheduler)
{
_thread.Start();
}
_concurrency = concurrency;
_logger = loggerFactory.CreateLogger<ProcessScheduler>();

public void Add(RequestProcessType type, string name, Func<Task> request)
{
_queue.Add((type, name, request));
var subject = new Subject<(RequestProcessType type, string name, IObservable<Unit> request)>();
_disposable.Add(subject);
_enqueue = subject;
_scheduler = scheduler;
_queue = subject;
}

private Task Start(Func<Task> request)
public void Start()
{
var t = request();
if (t.Status == TaskStatus.Created) // || t.Status = TaskStatus.WaitingForActivation ?
t.Start();
return t;
}
var obs = Observable.Create<Unit>(observer => {
var cd = new CompositeDisposable();

private List<Task> RemoveCompleteTasks(List<Task> list)
{
if (list.Count == 0) return list;
var observableQueue =
new BehaviorSubject<(RequestProcessType type, ReplaySubject<IObservable<Unit>> observer)>((
RequestProcessType.Serial, new ReplaySubject<IObservable<Unit>>(int.MaxValue)));

cd.Add(_queue.Subscribe(item => {
if (observableQueue.Value.type != item.type)
{
observableQueue.Value.observer.OnCompleted();
observableQueue.OnNext((item.type, new ReplaySubject<IObservable<Unit>>(int.MaxValue)));
}

observableQueue.Value.observer.OnNext(HandleRequest(item.name, item.request));
}));

var result = new List<Task>();
foreach (var t in list)
cd.Add(observableQueue
.Select(item => {
var (type, replay) = item;

if (type == RequestProcessType.Serial)
return replay.Concat();

return _concurrency.HasValue
? replay.Merge(_concurrency.Value)
: replay.Merge();
})
.Concat()
.Subscribe(observer)
);

return cd;
});

_disposable.Add(obs
// .ObserveOn(_scheduler)
.Subscribe(_ => { })
);

IObservable<Unit> HandleRequest(string name, IObservable<Unit> request)
{
if (t.IsFaulted)
{
// TODO: Handle Fault
}
else if (!t.IsCompleted)
{
result.Add(t);
}
return request
.Catch<Unit, OperationCanceledException>(ex => Observable.Empty<Unit>())
.Catch<Unit, Exception>(ex => {
_logger.LogCritical(Events.UnhandledException, ex, "Unhandled exception executing {Name}",
name);
return Observable.Empty<Unit>();
});
}
return result;
}

public long _TestOnly_NonCompleteTaskCount = 0;
private void ProcessRequestQueue()
public void Add(RequestProcessType type, string name, IObservable<Unit> request)
{
// see https://github.com/OmniSharp/csharp-language-server-protocol/issues/4
// no need to be async, because this thing already allocated a thread on it's own.
var token = _cancel.Token;
var waitables = new List<Task>();
try
{
while (!token.IsCancellationRequested)
{
if (_queue.TryTake(out var item, Timeout.Infinite, token))
{
var (type, name, request) = item;
try
{
if (type == RequestProcessType.Serial)
{
Task.WaitAll(waitables.ToArray(), token);
Start(request).Wait(token);
}
else if (type == RequestProcessType.Parallel)
{
waitables.Add(Start(request));
}
else
throw new NotImplementedException("Only Serial and Parallel execution types can be handled currently");
waitables = RemoveCompleteTasks(waitables);
Interlocked.Exchange(ref _TestOnly_NonCompleteTaskCount, waitables.Count);
}
catch (OperationCanceledException ex) when (ex.CancellationToken == token)
{
throw;
}
catch (Exception e)
{
// TODO: Should we rethrow or swallow?
// If an exception happens... the whole system could be in a bad state, hence this throwing currently.
_logger.LogCritical(Events.UnhandledException, e, "Unhandled exception executing {Name}", name);
throw;
}
}
}
}
catch (OperationCanceledException ex) when (ex.CancellationToken == token)
{
// OperationCanceledException - The CancellationToken has been canceled.
Task.WaitAll(waitables.ToArray(), TimeSpan.FromMilliseconds(1000));
var keeponrunning = RemoveCompleteTasks(waitables);
Interlocked.Exchange(ref _TestOnly_NonCompleteTaskCount, keeponrunning.Count);
keeponrunning.ForEach((t) =>
{
// TODO: There is no way to abort a Task. As we don't construct the tasks, we can do nothing here
// Option is: change the task factory "Func<Task> request" to a "Func<CancellationToken, Task> request"
});
}
_enqueue.OnNext((type, name, request));
}

private bool _disposed = false;
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_cancel.Cancel();
_thread.Join();
_cancel.Dispose();
_disposable.Dispose();
}
}
}
2 changes: 1 addition & 1 deletion src/JsonRpc/RequestRouterBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public virtual async Task<ErrorResponse> RouteRequest(TDescriptor descriptor, Re

return new JsonRpc.Client.Response(request.Id, responseValue, request);
}
catch (TaskCanceledException)
catch (OperationCanceledException)
{
_logger.LogDebug("Request {Id} was cancelled", id);
return new RequestCancelled();
Expand Down
2 changes: 1 addition & 1 deletion src/Protocol/Document/Server/ICompletionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace OmniSharp.Extensions.LanguageServer.Protocol.Server
[Parallel, Method(DocumentNames.Completion)]
public interface ICompletionHandler : IJsonRpcRequestHandler<CompletionParams, CompletionList>, IRegistration<CompletionRegistrationOptions>, ICapability<CompletionCapability> { }

[Serial, Method(DocumentNames.CompletionResolve)]
[Parallel, Method(DocumentNames.CompletionResolve)]
public interface ICompletionResolveHandler : ICanBeResolvedHandler<CompletionItem> { }

public abstract class CompletionHandler : ICompletionHandler, ICompletionResolveHandler
Expand Down
9 changes: 6 additions & 3 deletions src/Server/LanguageServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class LanguageServer : ILanguageServer, IInitializeHandler, IInitializedH
private readonly SupportedCapabilities _supportedCapabilities;
private Task _initializingTask;
private readonly ILanguageServerConfiguration _configuration;
private readonly int? _concurrency;

public static Task<ILanguageServer> From(Action<LanguageServerOptions> optionsAction)
{
Expand Down Expand Up @@ -118,7 +119,8 @@ public static ILanguageServer PreInit(LanguageServerOptions options)
options.AddDefaultLoggingProvider,
options.ProgressManager,
options.ServerInfo,
options.ConfigurationBuilderAction
options.ConfigurationBuilderAction,
options.Concurrency
);
}

Expand All @@ -143,7 +145,8 @@ internal LanguageServer(
bool addDefaultLoggingProvider,
ProgressManager progressManager,
ServerInfo serverInfo,
Action<IConfigurationBuilder> configurationBuilderAction)
Action<IConfigurationBuilder> configurationBuilderAction,
int? concurrency)
{
var outputHandler = new OutputHandler(output, serializer);

Expand Down Expand Up @@ -234,7 +237,7 @@ internal LanguageServer(

var requestRouter = _serviceProvider.GetRequiredService<IRequestRouter<ILspHandlerDescriptor>>();
_responseRouter = _serviceProvider.GetRequiredService<IResponseRouter>();
_connection = ActivatorUtilities.CreateInstance<Connection>(_serviceProvider, input);
_connection = ActivatorUtilities.CreateInstance<Connection>(_serviceProvider, input, concurrency);

_exitHandler = new ServerExitHandler(_shutdownHandler);

Expand Down
1 change: 1 addition & 0 deletions src/Server/LanguageServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public LanguageServerOptions()
internal Action<ILoggingBuilder> LoggingBuilderAction { get; set; } = new Action<ILoggingBuilder>(_ => { });
internal Action<IConfigurationBuilder> ConfigurationBuilderAction { get; set; } = new Action<IConfigurationBuilder>(_ => { });
internal bool AddDefaultLoggingProvider { get; set; }
public int? Concurrency { get; set; }

internal readonly List<InitializeDelegate> InitializeDelegates = new List<InitializeDelegate>();
internal readonly List<InitializedDelegate> InitializedDelegates = new List<InitializedDelegate>();
Expand Down
12 changes: 12 additions & 0 deletions src/Server/LanguageServerOptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ public static LanguageServerOptions WithServerInfo(this LanguageServerOptions op
return options;
}

/// <summary>
/// Set maximum number of allowed parallel actions
/// </summary>
/// <param name="options"></param>
/// <param name="concurrency"></param>
/// <returns></returns>
public static LanguageServerOptions WithConcurrency(this LanguageServerOptions options, int? concurrency)
{
options.Concurrency = concurrency;
return options;
}

public static LanguageServerOptions OnInitialize(this LanguageServerOptions options, InitializeDelegate @delegate)
{
options.InitializeDelegates.Add(@delegate);
Expand Down
1 change: 1 addition & 0 deletions test/Directory.Build.targets
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageReference Include="XunitXml.TestLogger" />
<PackageReference Include="coverlet.collector" />
<PackageReference Include="coverlet.msbuild" />
<PackageReference Include="Microsoft.Reactive.Testing" />
</ItemGroup>
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.targets', '$(MSBuildThisFileDirectory)../'))" />
</Project>
3 changes: 2 additions & 1 deletion test/JsonRpc.Tests/DapInputHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private static InputHandler NewHandler(
requestRouter,
responseRouter,
Substitute.For<ILoggerFactory>(),
new DapSerializer());
new DapSerializer(),
null);
handler.Start();
cts.Wait();
Task.Delay(10).Wait();
Expand Down
Loading