ASP.NET Core中如何限制响应发送速率(不是调用频率)
前言
ASP.NET Core中有很多RateLimit组件,.NET 7甚至推出了官方版本。不过这些组件的主要目标是限制客户端访问服务的频率,在HTTP服务器崩溃前主动拒绝部分请求。如果请求没有被拒绝服务会尽可能调用资源尽快处理。
现在有一个问题,有什么办法限制响应的发送速率吗?这在一些需要长时间传输流式数据的情况时很有用,避免少量请求耗尽网络带宽,尽可能同时服务更多请求。
Tip
本文节选自我的新书《C#与.NET6 开发从入门到实践》12.11 流量控制。实现方式偏向知识讲解和教学,不保证组件稳定性,不建议直接在产品中使用。有关新书的更多介绍欢迎查看《C#与.NET6 开发从入门到实践》预售,作者亲自来打广告了!
正文
用过百度网盘的人应该都深有体会,如果没有会员,下载速度会非常慢。实现这种效果的方法有两种:控制TCP协议的滑动窗口大小;控制响应流的写入大小和频率。偏向系统底层的流量控制软件因为无法干涉软件中的流,所以一般会直接控制内核TCP协议的滑动窗口大小;而下载软件等客户端应用通常直接控制流的写入和读取,此时TCP协议的拥塞控制算法会自动调整滑动窗口大小。这种流量控制对提供大型多媒体资源的应用(例如在线视频网站)非常重要,能防止一个请求的响应占用太多带宽影响其他请求的响应发送。
ASP.NET Core并没有原生提供相关功能,Nuget上也没有找到相关的程序包(截止截稿)。但其实利用ASP.NET Core提供的接口,是可以实现这个功能的。笔者以ASP.NET Core的响应压缩中间件为蓝本,实现了一个简单的响应限流中间件。
编写节流组件
支持限速的基础流
using System;
namespace AccessControlElementary;
/// <summary>
/// 支持流量控制的流
/// </summary>
public class ThrottlingStream : Stream
{
/// <summary>
/// 用于指定每秒可传输的无限字节数的常数。
/// </summary>
public const long Infinite = 0;
#region Private members
/// <summary>
/// 基础流
/// </summary>
private readonly Stream _baseStream;
/// <summary>
/// 每秒可通过基础流传输的最大字节数。
/// </summary>
private long _maximumBytesPerSecond;
/// <summary>
/// 自上次限制以来已传输的字节数。
/// </summary>
private long _byteCount;
/// <summary>
/// 最后一次限制的开始时间(毫秒)。
/// </summary>
private long _start;
#endregion
#region Properties
/// <summary>
/// 获取当前毫秒数。
/// </summary>
/// <value>当前毫秒数。</value>
protected long CurrentMilliseconds => Environment.TickCount;
/// <summary>
/// 获取或设置每秒可通过基础流传输的最大字节数。
/// </summary>
/// <value>每秒最大字节数。</value>
public long MaximumBytesPerSecond
{
get => _maximumBytesPerSecond;
set
{
if (MaximumBytesPerSecond != value)
{
_maximumBytesPerSecond = value;
Reset();
}
}
}
/// <summary>
/// 获取一个值,该值指示当前流是否支持读取。
/// </summary>
/// <returns>如果流支持读取,则为true;否则为false。</returns>
public override bool CanRead => _baseStream.CanRead;
/// <summary>
/// 获取估算的流当前的比特率(单位:bps)。
/// </summary>
public long CurrentBitsPerSecond { get; protected set; }
/// <summary>
/// 获取一个值,该值指示当前流是否支持定位。
/// </summary>
/// <value></value>
/// <returns>如果流支持定位,则为true;否则为false。</returns>
public override bool CanSeek => _baseStream.CanSeek;
/// <summary>
/// 获取一个值,该值指示当前流是否支持写入。
/// </summary>
/// <value></value>
/// <returns>如果流支持写入,则为true;否则为false。</returns>
public override bool CanWrite => _baseStream.CanWrite;
/// <summary>
/// 获取流的长度(以字节为单位)。
/// </summary>
/// <value></value>
/// <returns>一个long值,表示流的长度(字节)。</returns>
/// <exception cref="T:System.NotSupportedException">基础流不支持定位。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
public override long Length => _baseStream.Length;
/// <summary>
/// 获取或设置当前流中的位置。
/// </summary>
/// <value></value>
/// <returns>流中的当前位置。</returns>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持定位。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
public override long Position
{
get => _baseStream.Position;
set => _baseStream.Position = value;
}
#endregion
#region Ctor
/// <summary>
/// 使用每秒可传输无限字节数的常数初始化 <see cref="T:ThrottlingStream"/> 类的新实例。
/// </summary>
/// <param name="baseStream">基础流。</param>
public ThrottlingStream(Stream baseStream)
: this(baseStream, Infinite) { }
/// <summary>
/// 初始化 <see cref="T:ThrottlingStream"/> 类的新实例。
/// </summary>
/// <param name="baseStream">基础流。</param>
/// <param name="maximumBytesPerSecond">每秒可通过基础流传输的最大字节数。</param>
/// <exception cref="ArgumentNullException">当 <see cref="baseStream"/> 是null引用时抛出。</exception>
/// <exception cref="ArgumentOutOfRangeException">当 <see cref="maximumBytesPerSecond"/> 是负数时抛出.</exception>
public ThrottlingStream(Stream baseStream, long maximumBytesPerSecond)
{
if (maximumBytesPerSecond < 0)
{
throw new ArgumentOutOfRangeException(nameof(maximumBytesPerSecond),
maximumBytesPerSecond, "The maximum number of bytes per second can't be negatie.");
}
_baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
_maximumBytesPerSecond = maximumBytesPerSecond;
_start = CurrentMilliseconds;
_byteCount = 0;
}
#endregion
#region Public methods
/// <summary>
/// 清除此流的所有缓冲区,并将所有缓冲数据写入基础设备。
/// </summary>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
public override void Flush() => _baseStream.Flush();
/// <summary>
/// 清除此流的所有缓冲区,并将所有缓冲数据写入基础设备。
/// </summary>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
public override Task FlushAsync(CancellationToken cancellationToken) => _baseStream.FlushAsync(cancellationToken);
/// <summary>
/// 从当前流中读取字节序列,并将流中的位置前进读取的字节数。
/// </summary>
/// <param name="buffer">字节数组。当此方法返回时,缓冲区包含指定的字节数组,其值介于offset和(offset+count-1)之间,由从当前源读取的字节替换。</param>
/// <param name="offset">缓冲区中从零开始的字节偏移量,开始存储从当前流中读取的数据。</param>
/// <param name="count">从当前流中读取的最大字节数。</param>
/// <returns>
/// 读入缓冲区的字节总数。如果许多字节当前不可用,则该值可以小于请求的字节数;如果已到达流的结尾,则该值可以小于零(0)。
/// </returns>
/// <exception cref="T:System.ArgumentException">偏移量和计数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持读取。 </exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或读取的最大字节数为负。</exception>
public override int Read(byte[] buffer, int offset, int count)
{
Throttle(count);
return _baseStream.Read(buffer, offset, count);
}
/// <summary>
/// 从当前流中读取字节序列,并将流中的位置前进读取的字节数。
/// </summary>
/// <param name="buffer">字节数组。当此方法返回时,缓冲区包含指定的字节数组,其值介于offset和(offset+count-1)之间,由从当前源读取的字节替换。</param>
/// <param name="offset">缓冲区中从零开始的字节偏移量,开始存储从当前流中读取的数据。</param>
/// <param name="count">从当前流中读取的最大字节数。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>
/// 读入缓冲区的字节总数。如果许多字节当前不可用,则该值可以小于请求的字节数;如果已到达流的结尾,则该值可以小于零(0)。
/// </returns>
/// <exception cref="T:System.ArgumentException">偏移量和计数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持读取。 </exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或读取的最大字节数为负。</exception>
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return await ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
}
/// <summary>
/// 从当前流中读取字节序列,并将流中的位置前进读取的字节数。
/// </summary>
/// <param name="buffer">内存缓冲区。当此方法返回时,缓冲区包含读取的数据。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>
/// 读入缓冲区的字节总数。如果许多字节当前不可用,则该值可以小于请求的字节数;如果已到达流的结尾,则该值可以小于零(0)。
/// </returns>
/// <exception cref="T:System.ArgumentException">偏移量和计数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持读取。 </exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或读取的最大字节数为负。</exception>
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
await ThrottleAsync(buffer.Length, cancellationToken);
return await _baseStream.ReadAsync(buffer, cancellationToken);
}
/// <summary>
/// 设置当前流中的位置。
/// </summary>
/// <param name="offset">相对于参考点的字节偏移量。</param>
/// <param name="origin">类型为<see cref="T:System.IO.SeekOrigin"/>的值,指示用于获取新位置的参考点。</param>
/// <returns>
/// 当前流中的新位置。
/// </returns>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持定位,例如流是从管道或控制台输出构造的。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
public override long Seek(long offset, SeekOrigin origin)
{
return _baseStream.Seek(offset, origin);
}
/// <summary>
/// 设置当前流的长度。
/// </summary>
/// <param name="value">当前流的所需长度(字节)。</param>
/// <exception cref="T:System.NotSupportedException">基础流不支持写入和定位,例如流是从管道或控制台输出构造的。</exception>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
public override void SetLength(long value)
{
_baseStream.SetLength(value);
}
/// <summary>
/// 将字节序列写入当前流,并按写入的字节数前进此流中的当前位置。
/// </summary>
/// <param name="buffer">字节数组。此方法将要写入当前流的字节从缓冲区复制到当前流。</param>
/// <param name="offset">缓冲区中从零开始向当前流复制字节的字节偏移量。</param>
/// <param name="count">要写入当前流的字节数。</param>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持写入。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.ArgumentException">偏移量和写入字节数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或写入字节数为负。</exception>
public override void Write(byte[] buffer, int offset, int count)
{
Throttle(count);
_baseStream.Write(buffer, offset, count);
}
/// <summary>
/// 将字节序列写入当前流,并按写入的字节数前进此流中的当前位置。
/// </summary>
/// <param name="buffer">字节数组。此方法将要写入当前流的字节从缓冲区复制到当前流。</param>
/// <param name="offset">缓冲区中从零开始向当前流复制字节的字节偏移量。</param>
/// <param name="count">要写入当前流的字节数。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持写入。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.ArgumentException">偏移量和写入字节数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或写入字节数为负。</exception>
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
}
/// <summary>
/// 将内存缓冲区写入当前流,并按写入的字节数前进此流中的当前位置。
/// </summary>
/// <param name="buffer">内存缓冲区。此方法将要写入当前流的字节从缓冲区复制到当前流。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <exception cref="T:System.IO.IOException">发生I/O错误。</exception>
/// <exception cref="T:System.NotSupportedException">基础流不支持写入。</exception>
/// <exception cref="T:System.ObjectDisposedException">方法在流关闭后被调用。</exception>
/// <exception cref="T:System.ArgumentNullException">缓冲区为null。</exception>
/// <exception cref="T:System.ArgumentException">偏移量和写入字节数之和大于缓冲区长度。</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">偏移量或写入字节数为负。</exception>
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await ThrottleAsync(buffer.Length, cancellationToken);
await _baseStream.WriteAsync(buffer, cancellationToken);
}
/// <summary>
/// 返回一个表示当前<see cref="T:System.Object" />的<see cref="T:System.String" />。
/// </summary>
/// <returns>
/// 表示当前<see cref="T:System.Object" />的<see cref="T:System.String" />。
/// </returns>
public override string ToString()
{
return _baseStream.ToString()!;
}
#endregion
#region Protected methods
/// <summary>
/// 如果比特率大于最大比特率,尝试限流
/// </summary>
/// <param name="bufferSizeInBytes">缓冲区大小(字节)。</param>
protected void Throttle(int bufferSizeInBytes)
{
var toSleep = CaculateThrottlingMilliseconds(bufferSizeInBytes);
if (toSleep > 1)
{
try
{
Thread.Sleep(toSleep);
}
catch (ThreadAbortException)
{
// 忽略ThreadAbortException。
}
// 睡眠已经完成,重置限流
Reset();
}
}
/// <summary>
/// 如果比特率大于最大比特率,尝试限流。
/// </summary>
/// <param name="bufferSizeInBytes">缓冲区大小(字节)。</param>
/// <param name="cancellationToken">取消令牌。</param>
protected async Task ThrottleAsync(int bufferSizeInBytes, CancellationToken cancellationToken)
{
var toSleep = CaculateThrottlingMilliseconds(bufferSizeInBytes);
if (toSleep > 1)
{
try
{
await Task.Delay(toSleep, cancellationToken);
}
catch (TaskCanceledException)
{
// 忽略TaskCanceledException。
}
// 延迟已经完成,重置限流。
Reset();
}
}
/// <summary>
/// 计算在操作流之前应当延迟的时间(单位:毫秒)。
/// 更新流当前的比特率。
/// </summary>
/// <param name="bufferSizeInBytes">缓冲区大小(字节)。</param>
/// <returns>应当延迟的时间(毫秒)。</returns>
protected int CaculateThrottlingMilliseconds(int bufferSizeInBytes)
{
int toSleep = 0;
// 确保缓冲区不为null
if (bufferSizeInBytes <= 0)
{
CurrentBitsPerSecond = 0;
}
else
{
_byteCount += bufferSizeInBytes;
long elapsedMilliseconds = CurrentMilliseconds - _start;
if (elapsedMilliseconds > 0)
{
// 计算当前瞬时比特率
var bp = _byteCount * 1000L;
var bps = bp / elapsedMilliseconds;
var avgBps = bps;
//如果bps大于最大bps,返回应当延迟的时间。
if (_maximumBytesPerSecond > 0 && bps > _maximumBytesPerSecond)
{
// 计算延迟时间
long wakeElapsed = bp / _maximumBytesPerSecond;
var result = (int)(wakeElapsed - elapsedMilliseconds);
// 计算平均比特率
var div = result / 1000.0;
avgBps = (long)(bps / (div == 0 ? 1 : div));
if (result > 1)
{
toSleep = result; ;
}
}
// 更新当前(平均)比特率
CurrentBitsPerSecond = (long)(avgBps / 8);
}
}
return toSleep;
}
/// <summary>
/// 将字节数重置为0,并将开始时间重置为当前时间。
/// </summary>
protected void Reset()
{
long difference = CurrentMilliseconds - _start;
// 只有在已知历史记录可用时间超过1秒时才重置计数器。
if (difference > 1000)
{
_byteCount = 0;
_start = CurrentMilliseconds;
}
}
#endregion
}
CaculateThrottleMilliseconds 、Throttle和ThrottleAsync是这个流的核心。CaculateThrottleMilliseconds方法负责计算在写入或读取流之前应该延迟多久和更新流当前的传输速率,Throttle和ThrottleAsync方法负责同步和异步延迟。
限流响应正文
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
using System.IO.Pipelines;
using System;
namespace AccessControlElementary;
// 自定义的HTTP功能接口,提供获取限流速率设置和当前速率的获取能力
public interface IHttpResponseThrottlingFeature
{
public long? MaximumBytesPerSecond { get; }
public long? CurrentBitsPerSecond { get; }
}
// 限流响应正文的实现类,实现了自定义的功能接口
public class ThrottlingResponseBody : Stream, IHttpResponseBodyFeature, IHttpResponseThrottlingFeature
{
private readonly IHttpResponseBodyFeature _innerBodyFeature;
private readonly IOptionsSnapshot<ResponseThrottlingOptions> _options;
private readonly HttpContext _httpContext;
private readonly Stream _innerStream;
private ThrottlingStream? _throttlingStream;
private PipeWriter? _pipeAdapter;
private bool _throttlingChecked;
private bool _complete;
private int _throttlingRefreshCycleCount;
public ThrottlingResponseBody(IHttpResponseBodyFeature innerBodyFeature, HttpContext httpContext, IOptionsSnapshot<ResponseThrottlingOptions> options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_httpContext = httpContext ?? throw new ArgumentNullException(nameof(httpContext));
_innerBodyFeature = innerBodyFeature ?? throw new ArgumentNullException(nameof(innerBodyFeature));
_innerStream = innerBodyFeature.Stream;
_throttlingRefreshCycleCount = 0;
}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => _innerStream.CanWrite;
public override long Length => _innerStream.Length;
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public Stream Stream => this;
public PipeWriter Writer
{
get
{
if (_pipeAdapter == null)
{
_pipeAdapter = PipeWriter.Create(Stream, new StreamPipeWriterOptions(leaveOpen: true));
if (_complete)
{
_pipeAdapter.Complete();
}
}
return _pipeAdapter;
}
}
public long? MaximumBytesPerSecond => _throttlingStream?.MaximumBytesPerSecond;
public long? CurrentBitsPerSecond => _throttlingStream?.CurrentBitsPerSecond;
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count)
{
OnWriteAsync().ConfigureAwait(false).GetAwaiter().GetResult();
if (_throttlingStream != null)
{
_throttlingStream.Write(buffer, offset, count);
_throttlingStream.Flush();
}
else
{
_innerStream.Write(buffer, offset, count);
}
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
}
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await OnWriteAsync();
if (_throttlingStream != null)
{
await _throttlingStream.WriteAsync(buffer, cancellationToken);
await _throttlingStream.FlushAsync(cancellationToken);
}
else
{
await _innerStream.WriteAsync(buffer, cancellationToken);
}
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
var tcs = new TaskCompletionSource(state: state, TaskCreationOptions.RunContinuationsAsynchronously);
InternalWriteAsync(buffer, offset, count, callback, tcs);
return tcs.Task;
}
private async void InternalWriteAsync(byte[] buffer, int offset, int count, AsyncCallback? callback, TaskCompletionSource tcs)
{
try
{
await WriteAsync(buffer.AsMemory(offset, count));
tcs.TrySetResult();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
if (callback != null)
{
// Offload callbacks to avoid stack dives on sync completions.
var ignored = Task.Run(() =>
{
try
{
callback(tcs.Task);
}
catch (Exception)
{
// Suppress exceptions on background threads.
}
});
}
}
public override void EndWrite(IAsyncResult asyncResult)
{
if (asyncResult == null)
{
throw new ArgumentNullException(nameof(asyncResult));
}
var task = (Task)asyncResult;
task.GetAwaiter().GetResult();
}
public async Task CompleteAsync()
{
if (_complete)
{
return;
}
await FinishThrottlingAsync(); // Sets _complete
await _innerBodyFeature.CompleteAsync();
}
public void DisableBuffering()
{
_innerBodyFeature?.DisableBuffering();
}
public override void Flush()
{
if (!_throttlingChecked)
{
OnWriteAsync().ConfigureAwait(false).GetAwaiter().GetResult();
// Flush the original stream to send the headers. Flushing the compression stream won't
// flush the original stream if no data has been written yet.
_innerStream.Flush();
return;
}
if (_throttlingStream != null)
{
_throttlingStream.Flush();
}
else
{
_innerStream.Flush();
}
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
if (!_throttlingChecked)
{
await OnWriteAsync();
// Flush the original stream to send the headers. Flushing the compression stream won't
// flush the original stream if no data has been written yet.
await _innerStream.FlushAsync(cancellationToken);
return;
}
if (_throttlingStream != null)
{
await _throttlingStream.FlushAsync(cancellationToken);
return;
}
await _innerStream.FlushAsync(cancellationToken);
}
public async Task SendFileAsync(string path, long offset, long? count, CancellationToken cancellationToken)
{
await OnWriteAsync();
if (_throttlingStream != null)
{
await SendFileFallback.SendFileAsync(Stream, path, offset, count, cancellationToken);
return;
}
await _innerBodyFeature.SendFileAsync(path, offset, count, cancellationToken);
}
public async Task StartAsync(CancellationToken cancellationToken = default)
{
await OnWriteAsync();
await _innerBodyFeature.StartAsync(cancellationToken);
}
internal async Task FinishThrottlingAsync()
{
if (_complete)
{
return;
}
_complete = true;
if (_pipeAdapter != null)
{
await _pipeAdapter.CompleteAsync();
}
if (_throttlingStream != null)
{
await _throttlingStream.DisposeAsync();
}
}
private async Task OnWriteAsync()
{
if (!_throttlingChecked)
{
_throttlingChecked = true;
var maxValue = await _options.Value.ThrottlingProvider.Invoke(_httpContext);
_throttlingStream = new ThrottlingStream(_innerStream, maxValue < 0 ? 0 : maxValue);
}
if (_throttlingStream != null && _options?.Value?.ThrottlingRefreshCycle > 0)
{
if (_throttlingRefreshCycleCount >= _options.Value.ThrottlingRefreshCycle)
{
_throttlingRefreshCycleCount = 0;
var maxValue = await _options.Value.ThrottlingProvider.Invoke(_httpContext);
_throttlingStream.MaximumBytesPerSecond = maxValue < 0 ? 0 : maxValue;
}
else
{
_throttlingRefreshCycleCount++;
}
}
}
}
自定义的响应正文类必须实现IHttpResponseBodyFeature接口才能作为应用的底层响应流使用,设计和实现参考ASP.NET Core的ResponseCompressionBody。
响应限流中间件
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Options;
using Timer = System.Timers.Timer;
namespace AccessControlElementary;
public class ResponseThrottlingMiddleware
{
private readonly RequestDelegate _next;
public ResponseThrottlingMiddleware(RequestDelegate next)
{
_next = next;
}
public async Task Invoke(HttpContext context, IOptionsSnapshot<ResponseThrottlingOptions> options, ILogger<ResponseThrottlingMiddleware> logger)
{
ThrottlingResponseBody throttlingBody = null;
IHttpResponseBodyFeature originalBodyFeature = null;
var shouldThrottling = await options?.Value?.ShouldThrottling?.Invoke(context);
if (shouldThrottling == true)
{
//获取原始输出Body
originalBodyFeature = context.Features.Get<IHttpResponseBodyFeature>();
//初始化限流Body
throttlingBody = new ThrottlingResponseBody(originalBodyFeature, context, options);
//设置成限流Body
context.Features.Set<IHttpResponseBodyFeature>(throttlingBody);
context.Features.Set<IHttpResponseThrottlingFeature>(throttlingBody);
// 用定时器定期向外汇报信息,这可能导致性能下降,仅用于演示目的
var timer = new Timer(1000);
timer.AutoReset = true;
long? currentBitsPerSecond = null;
var traceIdentifier = context.TraceIdentifier;
timer.Elapsed += (sender, arg) =>
{
if (throttlingBody.CurrentBitsPerSecond != currentBitsPerSecond)
{
currentBitsPerSecond = throttlingBody.CurrentBitsPerSecond;
var bps = (double)(throttlingBody.CurrentBitsPerSecond ?? 0);
var (unitBps, unit) = bps switch
{
< 1000 => (bps, "bps"),
< 1000_000 => (bps / 1000, "kbps"),
_ => (bps / 1000_000, "mbps"),
};
logger.LogDebug("请求:{RequestTraceIdentifier} 当前响应发送速率:{CurrentBitsPerSecond} {Unit}。", traceIdentifier, unitBps, unit);
}
};
// 开始发送响应后启动定时器
context.Response.OnStarting(async () =>
{
logger.LogInformation("请求:{RequestTraceIdentifier} 开始发送响应。", traceIdentifier);
timer.Start();
});
// 响应发送完成后销毁定时器
context.Response.OnCompleted(async () =>
{
logger.LogInformation("请求:{RequestTraceIdentifier} 响应发送完成。", traceIdentifier);
timer.Stop();
timer?.Dispose();
});
// 请求取消后销毁定时器
context.RequestAborted.Register(() =>
{
logger.LogInformation("请求:{RequestTraceIdentifier} 已中止。", traceIdentifier);
timer.Stop();
timer?.Dispose();
});
}
try
{
await _next(context);
if (shouldThrottling == true)
{
// 刷新响应流,确保所有数据都发送到网卡
await throttlingBody.FinishThrottlingAsync();
}
}
finally
{
if (shouldThrottling == true)
{
//限流发生错误,恢复原始Body
context.Features.Set(originalBodyFeature);
}
}
}
}
中间件负责把基础响应流替换为限流响应流,并为每个请求重新读取选项,使每个请求都能够独立控制限流的速率,然后在响应发送启动后记录响应的发送速率。
响应限流选项
namespace AccessControlElementary;
public class ResponseThrottlingOptions
{
/// <summary>
/// 获取或设置流量限制的值的刷新周期,刷新时会重新调用<see cref="ThrottlingProvider"/>设置限制值。
/// 值越大刷新间隔越久,0或负数表示永不刷新。
/// </summary>
public int ThrottlingRefreshCycle { get; set; }
/// <summary>
/// 获取或设置指示是否应该启用流量控制的委托
/// </summary>
public Func<HttpContext, Task<bool>> ShouldThrottling { get; set; }
/// <summary>
/// 获取或设置指示流量限制大小的委托(单位:Byte/s)
/// </summary>
public Func<HttpContext, Task<int>> ThrottlingProvider { get; set; }
}
响应限流服务注册和中间件配置扩展
namespace AccessControlElementary;
// 配置中间件用的辅助类和扩展方法
public static class ResponseThrottlingMiddlewareExtensions
{
public static IApplicationBuilder UseResponseThrottling(this IApplicationBuilder app)
{
return app.UseMiddleware<ResponseThrottlingMiddleware>();
}
}
// 注册中间件需要的服务的辅助类和扩展方法
public static class ResponseThrottlingServicesExtensions
{
public static IServiceCollection AddResponseThrottling(this IServiceCollection services, Action<ResponseThrottlingOptions> configureOptions = null)
{
services.Configure(configureOptions);
return services;
}
}
使用节流组件
服务注册和请求管道配置
Startup启动配置
namespace AccessControlElementary;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// 注册限流服务和选项
services.AddResponseThrottling(options =>
{
options.ThrottlingRefreshCycle = 100;
options.ShouldThrottling = static async _ => true;
options.ThrottlingProvider = static async _ => 100 * 1024; // 100KB/s
});
services.AddRazorPages();
}
public void Configure(IApplicationBuilder app)
{
// 配置响应限流中间件
app.UseResponseThrottling();
app.UseStaticFiles();
app.UseRouting();
app.UseAuthentication();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapRazorPages();
});
}
}
示例展示了如何配置和启用响应限流。ThrottlingRefreshCycle设置为每100次响应流写入周期刷新一次流量限制的值,使限流值能在响应发送中动态调整;ShouldThrottling设置为无条件启用限流;ThrottlingProvider设置为限速100 KB/s。
请求只有在UseResponseThrottling之前配置的短路中间件处被处理时不会受影响,请求没有被短路的话,只要经过限流中间件,基础响应流就被替换了。如果同时使用了响应压缩,会变成限流响应包裹压缩响应(或者相反),压缩响应(或者限流响应)又包裹基础响应的嵌套结构。
结语
本书在介绍.NET 6基础知识时会尽可能使用具有现实意义的示例避免学习和实践脱节,本文就是其中之一,如果本文对您有价值,欢迎继续了解和购买本书。《C#与.NET6 开发从入门到实践》预售,作者亲自来打广告了!