Distributed current limiting, everything you want to know is here

Distributed current limiting, everything you want to know is here

Preface

In a high-concurrency system, it is very important to control the flow. When a huge amount of traffic is directly requested to our server, it may cause the interface to become unavailable in a short time. If it is not handled, it may even cause the entire application to become unavailable.

For example, recently there is such a requirement. As a client, I need to produce data to Kafka, and Kafka consumers consume data continuously and request all the consumed data to the web server. Although the load is done (there are 4 web servers), the amount of business data is also huge, and there may be tens of thousands of data generated every second. If the producer produces data directly, it is very likely to drag down the web server.

[[263347]]

For this, it is necessary to do current limiting processing and produce a certain amount of data to Kafka every second, so as to ensure the normal operation of the web to a great extent.

In fact, no matter what scenario you are dealing with, the essence is to reduce traffic and ensure high availability of the application.

Common Algorithms

There are two common algorithms for current limiting:

  • Leaky Bucket Algorithm
  • Token Bucket Algorithm

The leaky bucket algorithm is relatively simple, which is to put the traffic into the bucket, and the leaky bucket will also flow out at a certain rate. If the traffic is too fast, it will overflow (the leaky bucket does not increase the outflow rate). The overflowing traffic is directly discarded.

As shown in the following figure:


This approach is simple and crude.

Although the leaky bucket algorithm is simple, it cannot cope with actual scenarios, such as a sudden surge in traffic.

At this time, the token bucket algorithm is needed:

The token bucket will put tokens into a fixed-capacity bucket at a constant rate, and take away one or more tokens when traffic comes. If there are no tokens in the bucket, the current request will be discarded or blocked.

In contrast, the token bucket can handle a certain amount of burst traffic.

RateLimiter Implementation

For the code implementation of the token bucket, you can directly use the RateLimiter in the Guava package.

  1. @Override
  2. public BaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) {
  3. //Call remote service
  4. OrderNoReqVO vo = new OrderNoReqVO();
  5. vo.setReqNo(userReqVO.getReqNo());
  6. RateLimiter limiter = RateLimiter.create (2.0);
  7. //Batch call
  8. for ( int i = 0 ; i< 10 ; i++){
  9. double acquire = limiter.acquire();
  10. logger.debug( "Get token successfully!,consumption=" + acquire);
  11. BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo);
  12. logger.debug( "Remote return:" +JSON.toJSONString(orderNo));
  13. }
  14. UserRes userRes = new UserRes() ;
  15. userRes.setUserId(123);
  16. userRes.setUserName( "张三" );
  17. userRes.setReqNo(userReqVO.getReqNo());
  18. userRes.setCode(StatusEnum.SUCCESS.getCode());
  19. userRes.setMessage( "Success" );
  20. return userRes;
  21. }

See here for details.

The call results are as follows:

From the code, we can see that two tokens are put into the bucket every second, and one token is consumed for each request. Therefore, only two requests can be sent per second. This is indeed the case according to the time in the figure (the return value is the time consumed to obtain this token, which is also about one token every 500ms).

There are several things to note when using RateLimiter:

It allows consumption first and payment later, which means that it can take a few tokens at a time or all the remaining tokens or even more when a request comes in, but the subsequent requests have to pay for the previous request. It needs to wait until the tokens in the bucket are replenished before it can continue to obtain tokens.

Summarize

For a single application, RateLimiter is sufficient. If it is a distributed environment, it can be done with the help of Redis.

Come and demonstrate.

Current limiting is adopted in the interface provided by the Order application. First, the bean of the current limiting tool is configured:

  1. @Configuration
  2. public class RedisLimitConfig {
  3. @Value( "${redis.limit}" )
  4. private int limit;
  5. @Autowired
  6. private JedisConnectionFactory jedisConnectionFactory;
  7. @Bean
  8. public RedisLimit build() {
  9. RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection();
  10. JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection();
  11. RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
  12. .limit(limit)
  13. .build();
  14. return redisLimit;
  15. }
  16. }

Then use the component in the Controller:

  1. @Autowired
  2. private RedisLimit redisLimit;
  3. @Override
  4. @CheckReqNo
  5. public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) {
  6. BaseResponse<OrderNoResVO> res = new BaseResponse();
  7. //Current limiting
  8. boolean limit = redisLimit.limit();
  9. if (!limit){
  10. res.setCode(StatusEnum.REQUEST_LIMIT.getCode());
  11. res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage());
  12. return res ;
  13. }
  14. res.setReqNo(orderNoReq.getReqNo());
  15. if ( null == orderNoReq.getAppId()){
  16. throw new SBCException(StatusEnum.FAIL);
  17. }
  18. OrderNoResVO orderNoRes = new OrderNoResVO();
  19. orderNoRes.setOrderId(DateUtil.getLongTime());
  20. res.setCode(StatusEnum.SUCCESS.getCode());
  21. res.setMessage(StatusEnum.SUCCESS.getMessage());
  22. res.setDataBody(orderNoRes);
  23. return res ;
  24. }

For ease of use, annotations are also provided:

  1. @Override
  2. @ControllerLimit
  3. public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) {
  4. BaseResponse<OrderNoResVO> res = new BaseResponse();
  5. // Business logic
  6. return res ;
  7. }

This annotation intercepts the http request and returns directly when the request reaches the threshold.

The normal method can also be used:

  1. @CommonLimit
  2. public void doSomething(){}

An exception will be thrown when the call threshold is reached.

In order to simulate concurrency, 10 threads are opened in the User application to call the Order interface (the current limit is 5 times) (professional concurrency testing tools such as JMeter can also be used).

  1. @Override
  2. public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) {
  3. //Call remote service
  4. OrderNoReqVO vo = new OrderNoReqVO();
  5. vo.setAppId(1L);
  6. vo.setReqNo(userReq.getReqNo());
  7. for ( int i = 0; i < 10; i++) {
  8. executorService.execute (new Worker(vo, orderServiceClient));
  9. }
  10. UserRes userRes = new UserRes();
  11. userRes.setUserId(123);
  12. userRes.setUserName( "张三" );
  13. userRes.setReqNo(userReq.getReqNo());
  14. userRes.setCode(StatusEnum.SUCCESS.getCode());
  15. userRes.setMessage( "Success" );
  16. return userRes;
  17. }
  18. private static class Worker implements Runnable {
  19. private OrderNoReqVO vo;
  20. private OrderServiceClient orderServiceClient;
  21. public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) {
  22. this.vo = vo;
  23. this.orderServiceClient = orderServiceClient;
  24. }
  25. @Override
  26. public void run() {
  27. BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo);
  28. logger.info( "Remote return:" + JSON.toJSONString(orderNo));
  29. }
  30. }

In order to verify the distributed effect, two Order applications are started.

The effect is as follows:

Implementation principle

The implementation principle is actually very simple. Since we want to achieve the effect of distributed global current limiting, we naturally need a third-party component to record the number of requests.

Redis is very suitable for such scenarios.

  • The current time (accurate to seconds) is written into Redis as a key for each request, and the timeout is set to 2 seconds. Redis increments the value of the key.
  • Returns an error when the threshold is reached.
  • The operation of writing to Redis is completed using Lua scripts, and the single-threaded mechanism of Redis can ensure the atomicity of each Redis request.

The Lua script is as follows:

--lua subscript starts from 1-- Current limiting keylocal key = KEYS[1]-- Current limiting sizelocal limit = tonumber(ARGV[1])-- Get current flow sizelocal curentLimit = tonumber(redis.call('get', key) or "0")if curentLimit + 1 > limit then -- Return return 0 if the current limiting size is reached;else -- The threshold value + 1 is not reached redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1end

Calling logic in Java:

  1. --lua subscript starts from 1  
  2. -- Current limiting key  
  3. local   key = KEYS[1]
  4. -- Current limit size  
  5. local limit = tonumber(ARGV[1])
  6. -- Get the current flow size  
  7. local curentLimit = tonumber(redis.call( 'get' , key ) or   "0" )
  8. if curentLimit + 1 > limit then  
  9. -- Reach the current limit size return  
  10. return 0;
  11. else  
  12. -- The threshold value + 1 is not reached  
  13. redis.call( "INCRBY" , key , 1)
  14. redis.call( "EXPIRE" , key , 2)
  15. return curentLimit + 1
  16. end  

Therefore, you only need to call this method where current limiting is needed to judge the return value to achieve the purpose of current limiting.

Of course, this is just a crude counter made using Redis. If you want to implement a token bucket algorithm similar to the one mentioned above, you can implement it yourself based on Lua.

Builder

When designing this component, we tried to provide users with a clear, readable, and error-prone API.

For example, in the first step, how to build a current limiting object.

The most common way is of course the constructor. If there are multiple domains, you can use the overlapping constructor method:

  1. public A(){}
  2. public A( int a){}
  3. public A( int a, int b){}

The disadvantages are also obvious: if there are too many parameters, it will be difficult to read. Even if the parameter types are consistent and the client reverses the order, it will not cause a warning, resulting in unpredictable results.

The second solution is to use the JavaBean mode and construct it using the setter method:

  1. A a = new A();
  2. a.setA(a);
  3. a.setB(b);

This approach is clear and easy to read, but it is easy to put the object in an inconsistent state and make the object thread-unsafe.

So here we use the third way to create objects, the builder:

  1. public class RedisLimit {
  2. private JedisCommands jedis;
  3. private int limit = 200;
  4. private static final int FAIL_CODE = 0;
  5. /**
  6. * lua script
  7. */
  8. private String script;
  9. private RedisLimit(Builder builder) {
  10. this.limit = builder.limit;
  11. this.jedis = builder.jedis;
  12. buildScript();
  13. }
  14. /**
  15. * limit traffic
  16. * @return if true  
  17. */
  18. public boolean limit() {
  19. String key = String.valueOf(System.currentTimeMillis() / 1000);
  20. Object result = null ;
  21. if (jedis instanceof Jedis) {
  22. result = ((Jedis) this.jedis).eval(script, Collections.singletonList( key ), Collections.singletonList(String.valueOf(limit)));
  23. } else if (jedis instanceof JedisCluster) {
  24. result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList( key ), Collections.singletonList(String.valueOf(limit)));
  25. } else {
  26. //throw new RuntimeException( "instance is error" ) ;
  27. return   false ;
  28. }
  29. if (FAIL_CODE != (Long) result) {
  30. return   true ;
  31. } else {
  32. return   false ;
  33. }
  34. }
  35. /**
  36. * read lua script
  37. */
  38. private void buildScript() {
  39. script = ScriptUtil.getScript( "limit.lua" );
  40. }
  41. /**
  42. * the builder
  43. * @param <T>
  44. */
  45. public   static class Builder<T extends JedisCommands>{
  46. private T jedis = null ;
  47. private int limit = 200;
  48. public Builder(T jedis){
  49. this.jedis = jedis;
  50. }
  51. public Builder limit( int limit){
  52. this.limit = limit;
  53. return this;
  54. }
  55. public RedisLimit build(){
  56. return new RedisLimit(this);
  57. }
  58. }
  59. }

So when the client uses it:

  1. RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
  2. .limit(limit)
  3. .build();

It is much simpler and more direct, and avoids breaking the creation process into multiple sub-steps.

This is useful when there are multiple constructor parameters, but they are not required fields.

Therefore, the distributed lock builder method is also updated:

https://github.com/crossoverJie/distributed-redis-tool#features

API

As can be seen from the above, the usage process is to call the limit method.

  1. //Current limiting
  2. boolean limit = redisLimit.limit();
  3. if (!limit){
  4. //Specific current limiting logic
  5. }

In order to reduce intrusion and simplify the client, two annotation methods are provided.

@ControllerLimit

This annotation can be used in interfaces modified by @RequestMapping and will provide a current limiting response after current limiting.

The implementation is as follows:

  1. @Component
  2. public class WebIntercept extends WebMvcConfigurerAdapter {
  3. private static Logger logger = LoggerFactory.getLogger(WebIntercept.class);
  4. @Autowired
  5. private RedisLimit redisLimit;
  6. @Override
  7. public void addInterceptors(InterceptorRegistry registry) {
  8. registry.addInterceptor(new CustomInterceptor())
  9. .addPathPatterns( "/**" );
  10. }
  11. private class CustomInterceptor extends HandlerInterceptorAdapter {
  12. @Override
  13. public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
  14. Object handler) throws Exception {
  15. if (redisLimit == null ) {
  16. throw new NullPointerException( "redisLimit is null" );
  17. }
  18. if (handler instanceof HandlerMethod) {
  19. HandlerMethod method = (HandlerMethod) handler;
  20. ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class);
  21. if (annotation == null ) {
  22. //skip
  23. return   true ;
  24. }
  25. boolean limit = redisLimit.limit();
  26. if (!limit) {
  27. logger.warn( "request has bean limit" );
  28. response.sendError(500, "request limit" );
  29. return   false ;
  30. }
  31. }
  32. return   true ;
  33. }
  34. }
  35. }

In fact, it implements the interceptor in SpringMVC, and determines whether annotations are used during the interception process, thereby calling the current limiting logic.

The premise is that the application needs to scan the class and let Spring manage it.

  1. @ComponentScan(value = "com.crossoverjie.distributed.intercept" )

@CommonLimit

Of course, it can also be used in ordinary methods. The implementation principle is Spring AOP (SpringMVC's interceptor is essentially AOP).

  1. @Aspect
  2. @Component
  3. @EnableAspectJAutoProxy(proxyTargetClass = true )
  4. public class CommonAspect {
  5. private static Logger logger = LoggerFactory.getLogger(CommonAspect.class);
  6. @Autowired
  7. private RedisLimit redisLimit;
  8. @Pointcut( "@annotation(com.crossoverjie.distributed.annotation.CommonLimit)" )
  9. private void check (){}
  10. @Before( "check()" )
  11. public void before(JoinPoint joinPoint) throws Exception {
  12. if (redisLimit == null ) {
  13. throw new NullPointerException( "redisLimit is null" );
  14. }
  15. boolean limit = redisLimit.limit();
  16. if (!limit) {
  17. logger.warn( "request has bean limit" );
  18. throw new RuntimeException( "request has bean limit" ) ;
  19. }
  20. }
  21. }

It's very simple, and the current limiting is also called during the interception process.

Of course, you also have to scan the package when using it:

  1. @ComponentScan(value = "com.crossoverjie.distributed.intercept" )

Summarize

Current limiting is a powerful tool for protecting applications in a high-concurrency and high-traffic system. There are many mature solutions. I hope to provide some ideas for friends who are just beginning to understand this area.

<<:  Will 5G be the connectivity miracle for the Internet of Things?

>>:  Apple and Qualcomm reached a settlement, Intel was eliminated, what is the difficulty in Huawei's must-win 5G baseband chip?

Recommend

Flexible consumption model reduces IT expenses and helps investments

Not all workloads are suitable for the cloud, whi...

A quick overview of 5G industry developments in April 2021

Since April 2021, my country's 5G development...

What is Wavelength Division Multiplexing (WDM)? A Beginner's Guide to WDM

Wavelength Division Multiplexing (WDM) has gained...

Buildings are finding ways to incorporate 5G into IoT networks

The long-awaited 5G technology has finally arrive...

How often does an Ethernet cable lose signal?

While many of us connect to Wi-Fi to browse the w...

5G is useless: Why do some people still say that 5G is useless?

The Ministry of Industry and Information Technolo...

...

Overview of Telecommunications Industry Development in 2017

The telecommunications industry is a hot field th...