Skip to content

Commit 6a4321b

Browse files
authored
[dotnet test MTP]: Pipe cleanup and add tests (#50723)
1 parent 409937d commit 6a4321b

File tree

3 files changed

+506
-32
lines changed

3 files changed

+506
-32
lines changed

src/Cli/dotnet/Commands/Test/MTP/IPC/NamedPipeServer.cs

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,15 @@ public NamedPipeServer(
3232
CancellationToken cancellationToken,
3333
bool skipUnknownMessages)
3434
{
35-
_namedPipeServerStream = new(pipeName, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly);
35+
_namedPipeServerStream = new NamedPipeServerStream(
36+
pipeName,
37+
PipeDirection.InOut,
38+
maxNumberOfServerInstances,
39+
PipeTransmissionMode.Byte,
40+
PipeOptions.Asynchronous | PipeOptions.CurrentUserOnly,
41+
inBufferSize: 0,
42+
outBufferSize: 0);
43+
3644
_callback = callback;
3745
_cancellationToken = cancellationToken;
3846
_skipUnknownMessages = skipUnknownMessages;
@@ -67,51 +75,69 @@ public async Task WaitConnectionAsync(CancellationToken cancellationToken)
6775
/// </summary>
6876
private async Task InternalLoopAsync(CancellationToken cancellationToken)
6977
{
70-
int currentMessageSize = 0;
71-
int missingBytesToReadOfWholeMessage = 0;
78+
// This is an indicator when reading from the pipe whether we are at the start of a new message (i.e, we should read 4 bytes as message size)
79+
// Note that the implementation assumes no overlapping messages in the pipe.
80+
// The flow goes like:
81+
// 1. MTP sends a request (and acquires lock).
82+
// 2. SDK reads the request.
83+
// 3. SDK sends a response.
84+
// 4. MTP reads the response (and releases lock).
85+
// This means that no two requests can be in the pipe at the same time.
86+
bool isStartOfNewMessage = true;
87+
int remainingBytesToReadOfWholeMessage = 0;
7288
while (!cancellationToken.IsCancellationRequested)
7389
{
74-
int missingBytesToReadOfCurrentChunk = 0;
75-
int currentReadIndex = 0;
76-
int currentReadBytes = await _namedPipeServerStream.ReadAsync(_readBuffer.AsMemory(currentReadIndex, _readBuffer.Length), cancellationToken);
77-
if (currentReadBytes == 0)
90+
// If we are at the start of a new message, we need to read at least the message size.
91+
int currentReadBytes = isStartOfNewMessage
92+
? await _namedPipeServerStream.ReadAtLeastAsync(_readBuffer, minimumBytes: sizeof(int), throwOnEndOfStream: false, cancellationToken)
93+
: await _namedPipeServerStream.ReadAsync(_readBuffer, cancellationToken);
94+
95+
if (currentReadBytes == 0 || (isStartOfNewMessage && currentReadBytes < sizeof(int)))
7896
{
7997
// The client has disconnected
8098
return;
8199
}
82100

83-
// Reset the current chunk size
84-
missingBytesToReadOfCurrentChunk = currentReadBytes;
101+
// The local remainingBytesToProcess tracks the remaining bytes of what we have read from the pipe but not yet processed.
102+
// At the beginning here, it contains everything we have read from the pipe.
103+
// As we are processing the data in it, we continue to slice it.
104+
Memory<byte> remainingBytesToProcess = _readBuffer.AsMemory(0, currentReadBytes);
85105

86-
// If currentRequestSize is 0, we need to read the message size
87-
if (currentMessageSize == 0)
106+
// If the current read is the start of a new message, we need to read the message size first.
107+
if (isStartOfNewMessage)
88108
{
89109
// We need to read the message size, first 4 bytes
90-
if (currentReadBytes < sizeof(int))
91-
{
92-
throw new UnreachableException(CliCommandStrings.DotnetTestPipeIncompleteSize);
93-
}
110+
remainingBytesToReadOfWholeMessage = BitConverter.ToInt32(remainingBytesToProcess.Span);
111+
112+
// Now that we have read the size, we slice the remainingBytesToProcess.
113+
remainingBytesToProcess = remainingBytesToProcess.Slice(sizeof(int));
94114

95-
currentMessageSize = BitConverter.ToInt32(_readBuffer, 0);
96-
missingBytesToReadOfCurrentChunk = currentReadBytes - sizeof(int);
97-
missingBytesToReadOfWholeMessage = currentMessageSize;
98-
currentReadIndex = sizeof(int);
115+
// Now that we have read the size, we are no longer at the start of a new message.
116+
// If the current chunk ended up to be the full message, we will set this back to true later.
117+
isStartOfNewMessage = false;
99118
}
100119

101-
if (missingBytesToReadOfCurrentChunk > 0)
120+
// We read the rest of the message.
121+
// Note that this assumes that no messages are overlapping in the pipe.
122+
if (remainingBytesToProcess.Length > 0)
102123
{
103124
// We need to read the rest of the message
104-
await _messageBuffer.WriteAsync(_readBuffer.AsMemory(currentReadIndex, missingBytesToReadOfCurrentChunk), cancellationToken);
105-
missingBytesToReadOfWholeMessage -= missingBytesToReadOfCurrentChunk;
125+
await _messageBuffer.WriteAsync(remainingBytesToProcess, cancellationToken);
126+
remainingBytesToReadOfWholeMessage -= remainingBytesToProcess.Length;
127+
128+
// At this point, we have read everything in the remainingBytesToProcess.
129+
// Note that while remainingBytesToProcess isn't accessed after this point, we still maintain the
130+
// invariant that it tracks what we have read from the pipe but not yet processed.
131+
remainingBytesToProcess = Memory<byte>.Empty;
106132
}
107133

108-
if (missingBytesToReadOfWholeMessage < 0)
134+
if (remainingBytesToReadOfWholeMessage < 0)
109135
{
110136
throw new UnreachableException(CliCommandStrings.DotnetTestPipeOverlapping);
111137
}
112138

113139
// If we have read all the message, we can deserialize it
114-
if (missingBytesToReadOfWholeMessage == 0)
140+
if (remainingBytesToReadOfWholeMessage == 0)
115141
{
116142
// Deserialize the message
117143
_messageBuffer.Position = 0;
@@ -147,12 +173,19 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)
147173

148174
// Write the message size
149175
byte[] bytes = _sizeOfIntArray;
150-
BitConverter.TryWriteBytes(bytes, sizeOfTheWholeMessage);
176+
if (!BitConverter.TryWriteBytes(bytes, sizeOfTheWholeMessage))
177+
{
178+
throw new UnreachableException();
179+
}
180+
151181
await _messageBuffer.WriteAsync(bytes, cancellationToken);
152182

153183
// Write the serializer id
154184
bytes = _sizeOfIntArray;
155-
BitConverter.TryWriteBytes(bytes, responseNamedPipeSerializer.Id);
185+
if (!BitConverter.TryWriteBytes(bytes, responseNamedPipeSerializer.Id))
186+
{
187+
throw new UnreachableException();
188+
}
156189

157190
await _messageBuffer.WriteAsync(bytes.AsMemory(0, sizeof(int)), cancellationToken);
158191

@@ -164,10 +197,6 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)
164197
{
165198
await _namedPipeServerStream.WriteAsync(_messageBuffer.GetBuffer().AsMemory(0, (int)_messageBuffer.Position), cancellationToken);
166199
await _namedPipeServerStream.FlushAsync(cancellationToken);
167-
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
168-
{
169-
_namedPipeServerStream.WaitForPipeDrain();
170-
}
171200
}
172201
finally
173202
{
@@ -177,8 +206,8 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)
177206
}
178207

179208
// Reset the control variables
180-
currentMessageSize = 0;
181-
missingBytesToReadOfWholeMessage = 0;
209+
isStartOfNewMessage = true;
210+
remainingBytesToReadOfWholeMessage = 0;
182211
}
183212
}
184213
}

0 commit comments

Comments
 (0)