Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/libraries/Common/tests/System/Net/ActivityRecorder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ internal class ActivityRecorder : IDisposable
public Activity LastFinishedActivity { get; private set; }
public IEnumerable<Activity> FinishedActivities => _finishedActivities;

public Action<Activity> OnStarted { get; set; }

public ActivityRecorder(string activitySourceName, string activityName)
{
_activitySourceName = activitySourceName;
Expand All @@ -50,8 +52,8 @@ public ActivityRecorder(string activitySourceName, string activityName)
}

Interlocked.Increment(ref _started);

LastStartedActivity = activity;
OnStarted?.Invoke(activity);
}
},
ActivityStopped = (activity) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,17 @@ internal sealed partial class HttpConnectionPool
ThrowGetVersionException(request, 3, reasonException);
}

long queueStartingTimestamp = HttpTelemetry.Log.IsEnabled() || (GlobalHttpSettings.MetricsHandler.IsGloballyEnabled && Settings._metrics!.RequestsQueueDuration.Enabled)
? Stopwatch.GetTimestamp()
: 0;
Activity? waitForConnectionActivity = ConnectionSetupDistributedTracing.StartWaitForConnectionActivity(authority);

if (!TryGetPooledHttp3Connection(request, out Http3Connection? connection, out http3ConnectionWaiter))
WaitForHttp3ConnectionActivity waitForConnectionActivity = new WaitForHttp3ConnectionActivity(Settings._metrics, authority);
if (!TryGetPooledHttp3Connection(request, out Http3Connection? connection, out http3ConnectionWaiter, out bool streamAvailable))
{
waitForConnectionActivity.Start();
try
{
connection = await http3ConnectionWaiter.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
ConnectionSetupDistributedTracing.ReportError(waitForConnectionActivity, ex);
waitForConnectionActivity?.Stop();
waitForConnectionActivity.Stop(request, this, ex);
throw;
}
}
Expand All @@ -113,7 +109,7 @@ internal sealed partial class HttpConnectionPool
return null;
}

HttpResponseMessage response = await connection.SendAsync(request, queueStartingTimestamp, waitForConnectionActivity, cancellationToken).ConfigureAwait(false);
HttpResponseMessage response = await connection.SendAsync(request, waitForConnectionActivity, streamAvailable, cancellationToken).ConfigureAwait(false);

// If an Alt-Svc authority returns 421, it means it can't actually handle the request.
// An authority is supposed to be able to handle ALL requests to the origin, so this is a server bug.
Expand All @@ -137,7 +133,7 @@ internal sealed partial class HttpConnectionPool
[SupportedOSPlatform("windows")]
[SupportedOSPlatform("linux")]
[SupportedOSPlatform("macos")]
private bool TryGetPooledHttp3Connection(HttpRequestMessage request, [NotNullWhen(true)] out Http3Connection? connection, [NotNullWhen(false)] out HttpConnectionWaiter<Http3Connection?>? waiter)
private bool TryGetPooledHttp3Connection(HttpRequestMessage request, [NotNullWhen(true)] out Http3Connection? connection, [NotNullWhen(false)] out HttpConnectionWaiter<Http3Connection?>? waiter, out bool streamAvailable)
{
Debug.Assert(IsHttp3Supported());

Expand All @@ -163,6 +159,7 @@ private bool TryGetPooledHttp3Connection(HttpRequestMessage request, [NotNullWhe
// There were no available connections. This request has been added to the request queue.
if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/3 connections; request queued.");
connection = null;
streamAvailable = false;
return false;
}
}
Expand All @@ -175,9 +172,11 @@ private bool TryGetPooledHttp3Connection(HttpRequestMessage request, [NotNullWhe
continue;
}

streamAvailable = connection.TryReserveStream();

// Disable and remove the connection from the pool only if we can open another.
// If we have only single connection, use the underlying QuicConnection mechanism to wait for available streams.
if (!connection.TryReserveStream() && EnableMultipleHttp3Connections)
if (!streamAvailable && EnableMultipleHttp3Connections)
{
if (NetEventSource.Log.IsEnabled()) connection.Trace("Found HTTP/3 connection in pool without available streams.");

Expand Down Expand Up @@ -385,8 +384,7 @@ private void ReturnHttp3Connection(Http3Connection connection, bool isNewConnect
return;
}

bool reserved;
while ((reserved = connection.TryReserveStream()) || !EnableMultipleHttp3Connections)
while (connection.TryReserveStream() || !EnableMultipleHttp3Connections)
{
// Loop in case we get a request that has already been canceled or handled by a different connection.
while (true)
Expand Down Expand Up @@ -449,10 +447,9 @@ private void ReturnHttp3Connection(Http3Connection connection, bool isNewConnect
}
else
{
if (reserved)
{
connection.ReleaseStream();
}
// TryReserveStream() always decrements the available stream counter when EnableMultipleHttp3Connections is false.
connection.ReleaseStream();

if (added)
{
if (NetEventSource.Log.IsEnabled()) connection.Trace("Put HTTP3 connection in pool.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public ValueTask<T> WaitForConnectionAsync(HttpRequestMessage request, HttpConne

private async ValueTask<T> WaitForConnectionWithTelemetryAsync(HttpRequestMessage request, HttpConnectionPool pool, bool async, CancellationToken requestCancellationToken)
{
// The HTTP/3 connection waiting span should include the time spent waiting for an available QUIC stream, therefore H3 telemetry is implemented elsewhere.
Debug.Assert(typeof(T) == typeof(HttpConnection) || typeof(T) == typeof(Http2Connection));

long startingTimestamp = Stopwatch.GetTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Globalization;
using System.IO;
using System.Net.Http.Headers;
using System.Net.Http.Metrics;
using System.Net.Quic;
using System.Runtime.CompilerServices;
using System.Runtime.Versioning;
Expand Down Expand Up @@ -176,29 +177,39 @@ private void CheckForShutdown()
}
}

/// <summary>
/// When EnableMultipleHttp3Connections is false: always reserve a stream, return a bool indicating if the stream is immediately available.
/// When EnableMultipleHttp3Connections is true: reserve a stream only if it's available meaning that the return value also indicates whether it has been reserved.
/// </summary>
public bool TryReserveStream()
{
bool singleConnection = !_pool.Settings.EnableMultipleHttp3Connections;

lock (SyncObj)
{
Debug.Assert(_availableRequestStreamsCount >= 0);
// For the single connection case, we allow the counter to go below zero.
Debug.Assert(singleConnection || _availableRequestStreamsCount >= 0);

if (NetEventSource.Log.IsEnabled()) Trace($"_availableRequestStreamsCount = {_availableRequestStreamsCount}");

if (_availableRequestStreamsCount == 0)
bool streamAvailable = _availableRequestStreamsCount > 0;

// Do not let the counter to go below zero when EnableMultipleHttp3Connections is true.
// This equivalent to an immediate ReleaseStream() for the case no stream is immediately available.
if (singleConnection || _availableRequestStreamsCount > 0)
{
return false;
--_availableRequestStreamsCount;
}

--_availableRequestStreamsCount;
return true;
return streamAvailable;
}
}

public void ReleaseStream()
{
lock (SyncObj)
{
Debug.Assert(_availableRequestStreamsCount >= 0);
Debug.Assert(!_pool.Settings.EnableMultipleHttp3Connections || _availableRequestStreamsCount >= 0);

if (NetEventSource.Log.IsEnabled()) Trace($"_availableRequestStreamsCount = {_availableRequestStreamsCount}");
++_availableRequestStreamsCount;
Expand All @@ -214,10 +225,12 @@ public void StreamCapacityCallback(QuicConnection connection, QuicStreamCapacity

lock (SyncObj)
{
Debug.Assert(_availableRequestStreamsCount >= 0);
Debug.Assert(_availableStreamsWaiter is null || _availableRequestStreamsCount >= 0);

if (NetEventSource.Log.IsEnabled()) Trace($"_availableRequestStreamsCount = {_availableRequestStreamsCount} + bidirectionalStreamsCountIncrement = {args.BidirectionalIncrement}");

// Since _availableStreamsWaiter is only used in the multi-connection case, when _availableRequestStreamsCount cannot go below zero,
// we don't need to check the value of _availableRequestStreamsCount here.
_availableRequestStreamsCount += args.BidirectionalIncrement;
_availableStreamsWaiter?.SetResult(!ShuttingDown);
_availableStreamsWaiter = null;
Expand All @@ -226,6 +239,9 @@ public void StreamCapacityCallback(QuicConnection connection, QuicStreamCapacity

public Task<bool> WaitForAvailableStreamsAsync()
{
// In the single connection case, _availableStreamsWaiter notifications do not guarantee that _availableRequestStreamsCount >= 0.
Debug.Assert(_pool.Settings.EnableMultipleHttp3Connections, "Calling WaitForAvailableStreamsAsync() is invalid when EnableMultipleHttp3Connections is false.");

lock (SyncObj)
{
Debug.Assert(_availableRequestStreamsCount >= 0);
Expand All @@ -245,19 +261,26 @@ public Task<bool> WaitForAvailableStreamsAsync()
}
}

public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, long queueStartingTimestamp, Activity? waitForConnectionActivity, CancellationToken cancellationToken)
public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, WaitForHttp3ConnectionActivity waitForConnectionActivity, bool streamAvailable, CancellationToken cancellationToken)
{
// Allocate an active request
QuicStream? quicStream = null;
Http3RequestStream? requestStream = null;

try
{
Exception? exception = null;
try
{
QuicConnection? conn = _connection;
if (conn != null)
{
// We found a connection in the pool, but it did not have available streams, OpenOutboundStreamAsync() is expected to wait.
if (!waitForConnectionActivity.Started && !streamAvailable)
{
waitForConnectionActivity.Start();
}

quicStream = await conn.OpenOutboundStreamAsync(QuicStreamType.Bidirectional, cancellationToken).ConfigureAwait(false);

requestStream = new Http3RequestStream(request, this, quicStream);
Expand All @@ -275,28 +298,15 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
// Since quicStream will stay `null`, the code below will throw appropriate exception to retry the request.
catch (ObjectDisposedException e)
{
ConnectionSetupDistributedTracing.ReportError(waitForConnectionActivity, e);
exception = e;
}
catch (QuicException e) when (e.QuicError != QuicError.OperationAborted)
{
ConnectionSetupDistributedTracing.ReportError(waitForConnectionActivity, e);
exception = e;
}
finally
{
waitForConnectionActivity?.Stop();
if (queueStartingTimestamp != 0)
{
TimeSpan duration = Stopwatch.GetElapsedTime(queueStartingTimestamp);
if (GlobalHttpSettings.MetricsHandler.IsGloballyEnabled)
{
_pool.Settings._metrics!.RequestLeftQueue(request, Pool, duration, versionMajor: 3);
}

if (HttpTelemetry.Log.IsEnabled())
{
HttpTelemetry.Log.RequestLeftQueue(versionMajor: 3, duration);
}
}
waitForConnectionActivity.Stop(request, Pool, exception);
}

if (quicStream == null)
Expand All @@ -317,7 +327,7 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
throw new HttpRequestException(HttpRequestError.Unknown, SR.net_http_request_aborted, null, RequestRetryType.RetryOnConnectionFailure);
}

Debug.Assert(waitForConnectionActivity?.IsStopped != false);
waitForConnectionActivity.AssertActivityNotRunning();
if (ConnectionSetupActivity is not null) ConnectionSetupDistributedTracing.AddConnectionLinkToRequestActivity(ConnectionSetupActivity);
if (NetEventSource.Log.IsEnabled()) Trace($"Sending request: {request}");

Expand Down Expand Up @@ -928,4 +938,62 @@ async ValueTask SkipUnknownPayloadAsync(long payloadLength)
}
}
}

/// <summary>
/// Tracks telemetry signals associated with the time period an HTTP/3 request spends waiting for a usable HTTP/3 connection:
/// the wait_for_connection Activity, the RequestLeftQueue EventSource event and the http.client.request.time_in_queue metric.
/// </summary>
internal struct WaitForHttp3ConnectionActivity
{
private readonly SocketsHttpHandlerMetrics? _metrics;
private readonly HttpAuthority _authority;
private Activity? _activity;
private long _startTimestamp;

public WaitForHttp3ConnectionActivity(SocketsHttpHandlerMetrics? metrics, HttpAuthority authority)
{
_metrics = metrics;
_authority = authority;
}

public bool Started { get; private set; }

public void Start()
{
Debug.Assert(!Started);
_startTimestamp = HttpTelemetry.Log.IsEnabled() || (GlobalHttpSettings.MetricsHandler.IsGloballyEnabled && _metrics!.RequestsQueueDuration.Enabled) ? Stopwatch.GetTimestamp() : 0;
_activity = ConnectionSetupDistributedTracing.StartWaitForConnectionActivity(_authority);
Started = true;
}

public void Stop(HttpRequestMessage request, HttpConnectionPool pool, Exception? exception)
{
if (exception is not null)
{
ConnectionSetupDistributedTracing.ReportError(_activity, exception);
}

_activity?.Stop();

if (_startTimestamp != 0)
{
TimeSpan duration = Stopwatch.GetElapsedTime(_startTimestamp);

if (GlobalHttpSettings.MetricsHandler.IsGloballyEnabled)
{
_metrics!.RequestLeftQueue(request, pool, duration, versionMajor: 3);
}
if (HttpTelemetry.Log.IsEnabled())
{
HttpTelemetry.Log.RequestLeftQueue(3, duration);
}
}
}

[Conditional("DEBUG")]
public void AssertActivityNotRunning()
{
Debug.Assert(_activity?.IsStopped != false);
}
}
}
Loading
Loading