Asp.NetCore7preview4重磅新特性--限流中间件

博客 动态
0 230
优雅殿下
优雅殿下 2022-05-12 23:59:10
悬赏:0 积分 收藏

Asp.Net Core 7 preview 4 重磅新特性--限流中间件

前言

限流是应对流量暴增或某些用户恶意攻击等场景的重要手段之一,然而微软官方从未支持这一重要特性,AspNetCoreRateLimit这一第三方库限流库一般作为首选使用,然而其配置参数过于繁多,对使用者造成较大的学习成本。令人高兴的是,在刚刚发布的.NET 7 Preview 4中开始支持限流中间件。

UseRateLimiter尝鲜

  1. 安装.NET 7.0 SDK(v7.0.100-preview.4)
  2. 通过nuget包安装Microsoft.AspNetCore.RateLimiting
  3. 创建.Net7网站应用,注册中间件

全局限流并发1个

app.UseRateLimiter(new RateLimiterOptions{    Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>    {        return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",            _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));    })});

根据不同资源不同限制并发数/api前缀的资源租约数2,等待队列长度为2,其他默认租约数1,队列长度1。

app.UseRateLimiter(new RateLimiterOptions(){    // 触发限流的响应码    DefaultRejectionStatusCode = 500,    OnRejected = async (ctx, rateLimitLease) =>    {        // 触发限流回调处理    },    Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>    {        if (resource.Request.Path.StartsWithSegments("/api"))        {            return RateLimitPartition.CreateConcurrencyLimiter("WebApiLimiter",                _ => new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2));        }        else        {            return RateLimitPartition.CreateConcurrencyLimiter("DefaultLimiter",                _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));        }    })});

本地测试

  1. 新建一个webapi项目,并注册限流中间件如下
using Microsoft.AspNetCore.RateLimiting;using System.Threading.RateLimiting;var builder = WebApplication.CreateBuilder(args);// Add services to the container.builder.Services.AddControllers();// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbucklebuilder.Services.AddEndpointsApiExplorer();builder.Services.AddSwaggerGen();var app = builder.Build();// Configure the HTTP request pipeline.if (app.Environment.IsDevelopment()){    app.UseSwagger();    app.UseSwaggerUI();}app.UseRateLimiter(new RateLimiterOptions{    DefaultRejectionStatusCode = 500,    OnRejected = async (ctx, lease) =>    {        await Task.FromResult(ctx.Response.WriteAsync("ConcurrencyLimiter"));    },    Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>    {        return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",            _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));    })});app.UseHttpsRedirection();app.UseAuthorization();app.MapControllers();app.Run();
  1. 启动项目,使用jmeter测试100并发,请求接口/WeatherForecast

    所有请求处理成功,失败0!

这个结果是不是有点失望,其实RateLimitPartition.CreateConcurrencyLimiter创建的限流器是
ConcurrencyLimiter,后续可以实现个各种策略的限流器进行替换之。
看了ConcurrencyLimiter的实现,其实就是令牌桶的限流思想,上面配置的new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)),第一个1代表令牌的个数,第二个1代表可以当桶里的令牌为空时,进入等待队列,而不是直接失败,当前面的请求结束后,会归还令牌,此时等待的请求就可以拿到令牌了,QueueProcessingOrder.NewestFirst代表最新的请求优先获取令牌,也就是获取令牌时非公平的,还有另一个枚举值QueueProcessingOrder.OldestFirst老的优先,获取令牌是公平的。只要我们获取到令牌的人干活速度快,虽然我们令牌只有1,并发就很高。
3. 测试触发失败场景
只需要让我们拿到令牌的人持有时间长点,就能轻易的触发。

调整jmater并发数为10

相应内容也是我们设置的内容。

ConcurrencyLimiter源码

令牌桶限流思想

获取令牌

        protected override RateLimitLease AcquireCore(int permitCount)        {            // These amounts of resources can never be acquired            if (permitCount > _options.PermitLimit)            {                throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));            }            ThrowIfDisposed();            // Return SuccessfulLease or FailedLease to indicate limiter state            if (permitCount == 0)            {                return _permitCount > 0 ? SuccessfulLease : FailedLease;            }            // Perf: Check SemaphoreSlim implementation instead of locking            if (_permitCount >= permitCount)            {                lock (Lock)                {                    if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))                    {                        return lease;                    }                }            }            return FailedLease;        }

尝试获取令牌核心逻辑

        private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease)        {            ThrowIfDisposed();            // if permitCount is 0 we want to queue it if there are no available permits            if (_permitCount >= permitCount && _permitCount != 0)            {                if (permitCount == 0)                {                    // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available                    lease = SuccessfulLease;                    return true;                }                // a. if there are no items queued we can lease                // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest                if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))                {                    _idleSince = null;                    _permitCount -= permitCount;                    Debug.Assert(_permitCount >= 0);                    lease = new ConcurrencyLease(true, this, permitCount);                    return true;                }            }            lease = null;            return false;        }

令牌获取失败后进入等待队列

 protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default)        {            // These amounts of resources can never be acquired            if (permitCount > _options.PermitLimit)            {                throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));            }            // Return SuccessfulLease if requestedCount is 0 and resources are available            if (permitCount == 0 && _permitCount > 0 && !_disposed)            {                return new ValueTask<RateLimitLease>(SuccessfulLease);            }            // Perf: Check SemaphoreSlim implementation instead of locking            lock (Lock)            {                if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))                {                    return new ValueTask<RateLimitLease>(lease);                }                // Avoid integer overflow by using subtraction instead of addition                Debug.Assert(_options.QueueLimit >= _queueCount);                if (_options.QueueLimit - _queueCount < permitCount)                {                    if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)                    {                        // remove oldest items from queue until there is space for the newest request                        do                        {                            RequestRegistration oldestRequest = _queue.DequeueHead();                            _queueCount -= oldestRequest.Count;                            Debug.Assert(_queueCount >= 0);                            if (!oldestRequest.Tcs.TrySetResult(FailedLease))                            {                                // Updating queue count is handled by the cancellation code                                _queueCount += oldestRequest.Count;                            }                        }                        while (_options.QueueLimit - _queueCount < permitCount);                    }                    else                    {                        // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst                        return new ValueTask<RateLimitLease>(QueueLimitLease);                    }                }                CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);                CancellationTokenRegistration ctr = default;                if (cancellationToken.CanBeCanceled)                {                    ctr = cancellationToken.Register(static obj =>                    {                        ((CancelQueueState)obj!).TrySetCanceled();                    }, tcs);                }                RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);                _queue.EnqueueTail(request);                _queueCount += permitCount;                Debug.Assert(_queueCount <= _options.QueueLimit);                return new ValueTask<RateLimitLease>(request.Tcs.Task);            }        }

归还令牌

 private void Release(int releaseCount)        {            lock (Lock)            {                if (_disposed)                {                    return;                }                _permitCount += releaseCount;                Debug.Assert(_permitCount <= _options.PermitLimit);                while (_queue.Count > 0)                {                    RequestRegistration nextPendingRequest =                        _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst                        ? _queue.PeekHead()                        : _queue.PeekTail();                    if (_permitCount >= nextPendingRequest.Count)                    {                        nextPendingRequest =                            _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst                            ? _queue.DequeueHead()                            : _queue.DequeueTail();                        _permitCount -= nextPendingRequest.Count;                        _queueCount -= nextPendingRequest.Count;                        Debug.Assert(_permitCount >= 0);                        ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);                        // Check if request was canceled                        if (!nextPendingRequest.Tcs.TrySetResult(lease))                        {                            // Queued item was canceled so add count back                            _permitCount += nextPendingRequest.Count;                            // Updating queue count is handled by the cancellation code                            _queueCount += nextPendingRequest.Count;                        }                        nextPendingRequest.CancellationTokenRegistration.Dispose();                        Debug.Assert(_queueCount >= 0);                    }                    else                    {                        break;                    }                }                if (_permitCount == _options.PermitLimit)                {                    Debug.Assert(_idleSince is null);                    Debug.Assert(_queueCount == 0);                    _idleSince = Stopwatch.GetTimestamp();                }            }        }

总结

虽然这次官方对限流进行了支持,但貌似还不能支持对ip或client级别的限制支持,对于更高级的限流策略仍需要借助第三方库或自己实现,期待后续越来越完善。

posted @ 2022-05-12 23:03 gui.h 阅读(0) 评论(0) 编辑 收藏 举报
回帖
    优雅殿下

    优雅殿下 (王者 段位)

    2018 积分 (2)粉丝 (47)源码

    小小码农,大大世界

     

    温馨提示

    亦奇源码

    最新会员