本文介绍了“什么是限流”的相关知识。很多人在实际案例的操作中会遇到这样的困难。让边肖带领你学习如何处理这些情况。希望大家认真阅读,学点东西!
00-1010,我们在做限流的时候,有一些常用的限流算法,包括:计数器限流、令牌桶限流、漏桶限流;
1.令牌桶电流限制。
令牌桶算法的原理是系统以一定的速率将令牌放入桶中,满了就丢弃。当请求到来时,令牌将首先从桶中取出。如果可以获得令牌,则可以继续完成请求,否则,等待或拒绝服务。令牌桶一定程度上允许突发流量,只要有令牌就可以处理,支持一次取多个令牌;
2.漏桶限流。
漏桶的原理是请求以固定的恒定速率流出,而传入的请求以任意速率流出。当请求数量超过存储桶的容量时,新请求会等待或拒绝服务。可见漏桶可以强行限制数据的传输速度。
3.逆流极限。
Counter是一种简单粗暴的算法,主要用于限制总并发数,比如数据库连接池、线程池、spike并发数。只要某一时间内的请求总数超过设定的阈值,计数器的限流;
限流算法
了解限流算法后,我们需要知道在哪里以及如何限制电流。对于系统,我们通常可以限制接入层的电流。大多数情况下,我们可以直接使用nginx、OpenResty等中间件直接处理。你也可以在业务层面进行限流,这需要根据我们不同的业务需求,使用相关的限流算法来处理。
如何限流
对于业务层,我们可以是单节点、多节点用户绑定,也可以是多节点非绑定。此时,我们需要区分过程限流和分布式限流。
业务层限流
对于过程限流来说相对简单,而番石榴是我们经常使用的利器。让我们看看如何限制接口的总并发量,特定时间窗口内的请求数量,并使用令牌桶和漏桶进行更平滑的限流。
限制接口的总并发量。
您只需要配置总并发量,然后使用计算器记录每个请求,然后将其与总并发量进行比较:
privatestaticintmax=10privatedstationatomicintegerlimit=new tomicinteger();If(限制器。incrementandget()(最大值){system。呃。println('超过最大限制');返回;
}限制时间窗口请求的数量。
在指定时间内限制一个接口的请求量,用番石榴缓存计数器,然后设置到期时间;例如,将每分钟最大请求数设置为100:
LoadingCacheLong,AtomicLongcounter=cachebuilder . new builder()。expireAfterWrite(1,时间单位。分钟)。build(newCacheLoaderLong,atomic clong(){ @ overridedpublicatatomiclongload(Longkey)throwsException { returnnewAtomicLong(0);
}
});privatestaticintmax=100longcurMinutes=system . currentimellis()/1000 * 60;if(counter.get(curMinutes))。incrementAndGet()(最大值){ 0
System.err.println('时间窗口请求数超过上限');返回;
}到期时间为一分钟,每分钟自动清零;这样可能会有超限。例如,前59秒没有消息,60秒有200条消息。此时,100条消息首先被接受,就在过期计数器被清除为0之后,然后100条消息被接受。这种情况可以参考TCP的滑动窗口思想来解决。
dingleft-2">
平滑限流请求
计数器的方式还是比较粗暴的,令牌桶和漏桶限流这两种算法相对来说还是比较平滑的,可以直接使用guava提供的RateLimiter类:
RateLimiter limiter = RateLimiter.create(2);System.out.println(limiter.acquire(4));System.out.println(limiter.acquire());System.out.println(limiter.acquire());System.out.println(limiter.acquire(2));System.out.println(limiter.acquire());System.out.println(limiter.acquire());
create(2)表示桶容量为2并且每秒新增2个令牌,也就是500毫秒新增一个令牌,acquire()表示从里面获取一个令牌,返回值为等待的时间,输出结果如下:
0.01.9986330.496440.5002240.9993350.500186
可以看到此算法是允许一定突发情况的,第一次获取4个令牌等待时间为0,后面再获取需要等待2秒才可以,后面每次获取需要500毫秒。
分布式限流
现在大部分系统都采用了多节点部署,所以一个业务可能在多个进程内被处理,所以这时候分布式限流必不可少,比如常见的秒杀系统,可能同时有N台业务逻辑节点;
常规的做法是使用Redis+lua和OpenResty+lua来实现,将限流服务做成原子化,同时也要保证高性能;Redis和OpenResty都已高性能著称,同时也提供了原子化方案,具体如下所示;
-
Redis+lua
Redis在服务端对消息的处理是单线程的,同时支持lua脚本的执行,可以将限流的相关逻辑用lua脚本实现,来保证原子性,大体实现如下:
-- 限流 keylocal key = KEYS[1]-- 限流大小local limit = tonumber(ARGV[1])-- 过期时间local expire = tonumber(ARGV[2])local current = tonumber(redis.call('get',key) or "0")if current + 1 > limit thenreturn 0;elseredis.call("INCRBY", key, 1) redis.call("EXPIRE", key, expire)return current + 1end
以上使用计数器算法来实现限流,在调用lua的地方可以传入限流key,限流大小以及key的有效期;返回结果如果为0表示超出限流大小,否则返回当前累计的值。
-
OpenResty+lua
OpenResty核心就是nginx,但是在这个基础之上加了很多第三方模块,ngx_lua模块将lua嵌入到了nginx中,使得nginx可以作为一个web服务器来使用;还有其他常用的开发模块如:lua-resty-lock,lua-resty-limit-traffic,lua-resty-memcached,lua-resty-mysql,lua-resty-redis等等;
本小节我们先使用lua-resty-lock模块来实现一个简单计数器限流,相关lua代码如下:
local locks = require "resty.lock";local function acquire()local lock = locks:new("locks");local elapsed, err = lock:lock("limit_key");local limit_counter = ngx.shared.limit_counter;--获取客户端iplocal key = ngx.var.remote_addr;--限流大小local limit = 5; local current = limit_counter:get(key); --打印key和当前值ngx.say("key="..key..",value="..tostring(current)); if current ~= nil and current + 1 > limit then lock:unlock(); return 0;endif current == nil then limit_counter:set(key,1,5); --设置过期时间为5秒else limit_counter:incr(key,1);endlock:unlock();return 1;end
以上是一个对ip进行限流的实例,因为需要保证原子性,所以使用了resty.lock模块,同时也类似redis设置了过期时间重置,另外一点需要注意对锁的释放;还需要设置两个共享字典
http {... #lua_shared_dict <name> <size> 定义一块名为name的共享内存空间,内存大小为size; 通过该命令定义的共享内存对象对于Nginx中所有worker进程都是可见的lua_shared_dict locks 10m;lua_shared_dict limit_counter 10m;}
接入层限流
接入层通常就是流量入口处,Nginx被很多系统用作流量入口,当然OpenResty也不例外,而且OpenResty提供了更强大的功能,比如这里将要介绍的lua-resty-limit-traffic模块,是一个功能强大的限流模块;在使用lua-resty-limit-traffic之前我们先大致看一下如何使用OpenResty;
OpenResty安装使用
-
下载安装配置
直接去官方下载即可:http://openresty.org/en/download.html,启动,重载,停止命令如下:
nginx.exenginx.exe -s reloadnginx.exe -s stop
打开ip+端口,可以看到:Welcome to OpenResty! 即表示启动成功;
-
lua脚本实例
首先需要在nginx.conf的http目录下做如下配置:
http { ... lua_package_path "/lualib/?.lua;;"; #lua 模块 lua_package_cpath "/lualib/?.so;;"; #c模块 include lua.conf; #导入自定义lua配置文件}
这里自定义了一个lua.conf,有关lua的请求都在这里面配置,放在和nginx.conf一个路径下即可;已一个test.lua为例,lua.conf配置如下:
#lua.conf server { charset utf-8; #设置编码listen 8081; server_name _; location /test { default_type 'text/html'; content_by_lua_file lua/api/test.lua; } }
这里把所有的lua文件都放在lua/api目录下,比如一个最简单的hello world:
ngx.say("hello world");
lua-resty-limit-traffic模块
lua-resty-limit-traffic提供了限制最大并发连接数,时间窗口请求数,以及平滑限制请求数三种方式,分别对应:resty.limit.conn,resty.limit.count,resty.limit.req;相关文档可以直接在pod/lua-resty-limit-traffic中找到,里面有完整的实例;
以下会用到三个共享字典,事先在http下配置:
http {lua_shared_dict my_limit_conn_store 100m;lua_shared_dict my_limit_count_store 100m;lua_shared_dict my_limit_req_store 100m; }
-
限制最大并发连接数
提供的resty.limit.conn限制最大连接数,具体脚本如下:
local limit_conn = require "resty.limit.conn"--B<syntax:> C<obj, err = class.new(shdict_name, conn, burst, default_conn_delay)>local lim, err = limit_conn.new("my_limit_conn_store", 1, 0, 0.5)if not lim thenngx.log(ngx.ERR,"failed to instantiate a resty.limit.conn object: ", err)return ngx.exit(500)endlocal key = ngx.var.binary_remote_addrlocal delay, err = lim:incoming(key, true)if not delay thenif err == "rejected" thenreturn ngx.exit(502)endngx.log(ngx.ERR, "failed to limit req: ", err)return ngx.exit(500)endif lim:is_committed() thenlocal ctx = ngx.ctx ctx.limit_conn = lim ctx.limit_conn_key = key ctx.limit_conn_delay = delayendlocal conn = errif delay >= 0.001 thenngx.sleep(delay)end
new()参数分别是:字典名称,允许的最大并发请求数,允许的突发连接数,连接延迟;
incoming()中commit是一个布尔值,当为true时表示记录当前请求的数量,否则就直接运行;
返回值:如果请求不超过方法中指定的conn值,则此方法返回0作为延迟以及当前时间的并发请求(或连接)数;
-
限制时间窗口请求数
提供的resty.limit.count可以限制一定请求数在一个时间窗口内,具体脚本如下:
local limit_count = require "resty.limit.count"--B<syntax:> C<obj, err = class.new(shdict_name, count, time_window)>--速率限制在20/10slocal lim, err = limit_count.new("my_limit_count_store", 20, 10)if not lim thenngx.log(ngx.ERR, "failed to instantiate a resty.limit.count object: ", err)return ngx.exit(500)endlocal local key = ngx.var.binary_remote_addr--B<syntax:> C<delay, err = obj:incoming(key, commit)>local delay, err = lim:incoming(key, true)if not delay thenif err == "rejected" thenreturn ngx.exit(503)endngx.log(ngx.ERR, "failed to limit count: ", err)return ngx.exit(500)end
new()中指定的三个参数分别是:字典名称,指定的请求阈值,请求个数复位前的窗口时间,以秒为单位;
incoming()中commit是一个布尔值,当为true时表示记录当前请求的数量,否则就直接运行;
返回值:如果请求数在限制范围内,则返回当前请求被处理的延迟和将被处理的请求的剩余数;
-
平滑限制请求数
提供的resty.limit.req可以已更加平滑的方式限制请求,具体脚本如下:
local limit_req = require "resty.limit.req"--B<syntax:> C<obj, err = class.new(shdict_name, rate, burst)>--限制在200个请求/秒以下,给与100个请求/秒的突发请求;也就说每秒请求最大可以200-300之间,超出300报错local lim, err = limit_req.new("my_limit_req_store", 200, 100)if not lim thenngx.log(ngx.ERR,"failed to instantiate a resty.limit.req object: ", err)return ngx.exit(500)endlocal key = ngx.var.binary_remote_addrlocal delay, err = lim:incoming(key, true)if not delay thenif err == "rejected" thenreturn ngx.exit(503)endngx.log(ngx.ERR, "failed to limit req: ", err)return ngx.exit(500)endif delay >= 0.001 thenlocal excess = err ngx.sleep(delay)end
new()三个参数分别是:字典名称,请求速率(每秒数)阈值,每秒允许延迟的过多请求数;
incoming()中commit是一个布尔值,当为true时表示记录当前请求的数量,否则就直接运行,可以理解为一个开关;
返回值:如果请求数在限制范围内,则此方法返回0作为当前时间的延迟和每秒过多请求的(零)个数;
“何为限流”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/42858.html