Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 61 additions & 32 deletions src/Cli/dotnet/Commands/Test/MTP/IPC/NamedPipeServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ public NamedPipeServer(
CancellationToken cancellationToken,
bool skipUnknownMessages)
{
_namedPipeServerStream = new(pipeName, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly);
_namedPipeServerStream = new NamedPipeServerStream(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly,
inBufferSize: 0,
outBufferSize: 0);

_callback = callback;
_cancellationToken = cancellationToken;
_skipUnknownMessages = skipUnknownMessages;
Expand Down Expand Up @@ -67,51 +75,69 @@ public async Task WaitConnectionAsync(CancellationToken cancellationToken)
/// </summary>
private async Task InternalLoopAsync(CancellationToken cancellationToken)
{
int currentMessageSize = 0;
int missingBytesToReadOfWholeMessage = 0;
// This is an indicator when reading from the pipe whether we are at the start of a new message (i.e, we should read 4 bytes as message size)
// Note that the implementation assumes no overlapping messages in the pipe.
// The flow goes like:
// 1. MTP sends a request (and acquires lock).
// 2. SDK reads the request.
// 3. SDK sends a response.
// 4. MTP reads the response (and releases lock).
// This means that no two requests can be in the pipe at the same time.
bool isStartOfNewMessage = true;
int remainingBytesToReadOfWholeMessage = 0;
while (!cancellationToken.IsCancellationRequested)
{
int missingBytesToReadOfCurrentChunk = 0;
int currentReadIndex = 0;
int currentReadBytes = await _namedPipeServerStream.ReadAsync(_readBuffer.AsMemory(currentReadIndex, _readBuffer.Length), cancellationToken);
if (currentReadBytes == 0)
// If we are at the start of a new message, we need to read at least the message size.
int currentReadBytes = isStartOfNewMessage
? await _namedPipeServerStream.ReadAtLeastAsync(_readBuffer, minimumBytes: sizeof(int), throwOnEndOfStream: false, cancellationToken)
: await _namedPipeServerStream.ReadAsync(_readBuffer, cancellationToken);

if (currentReadBytes == 0 || (isStartOfNewMessage && currentReadBytes < sizeof(int)))
{
// The client has disconnected
return;
}

// Reset the current chunk size
missingBytesToReadOfCurrentChunk = currentReadBytes;
// The local remainingBytesToProcess tracks the remaining bytes of what we have read from the pipe but not yet processed.
// At the beginning here, it contains everything we have read from the pipe.
// As we are processing the data in it, we continue to slice it.
Memory<byte> remainingBytesToProcess = _readBuffer.AsMemory(0, currentReadBytes);

// If currentRequestSize is 0, we need to read the message size
if (currentMessageSize == 0)
// If the current read is the start of a new message, we need to read the message size first.
if (isStartOfNewMessage)
{
// We need to read the message size, first 4 bytes
if (currentReadBytes < sizeof(int))
{
throw new UnreachableException(CliCommandStrings.DotnetTestPipeIncompleteSize);
}
remainingBytesToReadOfWholeMessage = BitConverter.ToInt32(remainingBytesToProcess.Span);

// Now that we have read the size, we slice the remainingBytesToProcess.
remainingBytesToProcess = remainingBytesToProcess.Slice(sizeof(int));

currentMessageSize = BitConverter.ToInt32(_readBuffer, 0);
missingBytesToReadOfCurrentChunk = currentReadBytes - sizeof(int);
missingBytesToReadOfWholeMessage = currentMessageSize;
currentReadIndex = sizeof(int);
// Now that we have read the size, we are no longer at the start of a new message.
// If the current chunk ended up to be the full message, we will set this back to true later.
isStartOfNewMessage = false;
}

if (missingBytesToReadOfCurrentChunk > 0)
// We read the rest of the message.
// Note that this assumes that no messages are overlapping in the pipe.
if (remainingBytesToProcess.Length > 0)
{
// We need to read the rest of the message
await _messageBuffer.WriteAsync(_readBuffer.AsMemory(currentReadIndex, missingBytesToReadOfCurrentChunk), cancellationToken);
missingBytesToReadOfWholeMessage -= missingBytesToReadOfCurrentChunk;
await _messageBuffer.WriteAsync(remainingBytesToProcess, cancellationToken);
remainingBytesToReadOfWholeMessage -= remainingBytesToProcess.Length;

// At this point, we have read everything in the remainingBytesToProcess.
// Note that while remainingBytesToProcess isn't accessed after this point, we still maintain the
// invariant that it tracks what we have read from the pipe but not yet processed.
remainingBytesToProcess = Memory<byte>.Empty;
}

if (missingBytesToReadOfWholeMessage < 0)
if (remainingBytesToReadOfWholeMessage < 0)
{
throw new UnreachableException(CliCommandStrings.DotnetTestPipeOverlapping);
}

// If we have read all the message, we can deserialize it
if (missingBytesToReadOfWholeMessage == 0)
if (remainingBytesToReadOfWholeMessage == 0)
{
// Deserialize the message
_messageBuffer.Position = 0;
Expand Down Expand Up @@ -147,12 +173,19 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)

// Write the message size
byte[] bytes = _sizeOfIntArray;
BitConverter.TryWriteBytes(bytes, sizeOfTheWholeMessage);
if (!BitConverter.TryWriteBytes(bytes, sizeOfTheWholeMessage))
{
throw new UnreachableException();
}

await _messageBuffer.WriteAsync(bytes, cancellationToken);

// Write the serializer id
bytes = _sizeOfIntArray;
BitConverter.TryWriteBytes(bytes, responseNamedPipeSerializer.Id);
if (!BitConverter.TryWriteBytes(bytes, responseNamedPipeSerializer.Id))
{
throw new UnreachableException();
}

await _messageBuffer.WriteAsync(bytes.AsMemory(0, sizeof(int)), cancellationToken);

Expand All @@ -164,10 +197,6 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)
{
await _namedPipeServerStream.WriteAsync(_messageBuffer.GetBuffer().AsMemory(0, (int)_messageBuffer.Position), cancellationToken);
await _namedPipeServerStream.FlushAsync(cancellationToken);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
_namedPipeServerStream.WaitForPipeDrain();
}
}
finally
{
Expand All @@ -177,8 +206,8 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)
}

// Reset the control variables
currentMessageSize = 0;
missingBytesToReadOfWholeMessage = 0;
isStartOfNewMessage = true;
remainingBytesToReadOfWholeMessage = 0;
}
}
}
Expand Down
Loading
Loading