Ribbon 负载均衡策略 —— 图解、源码级解析
作者:mmseoamin日期:2023-12-11

🍊 Java学习:社区快速通道


🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想


🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年6月4日


🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 负载均衡策略
    • RandomRule
    • RoundRobinRule
    • RetryRule
    • WeightedResponseTimeRule
    • BestAvailableRule
    • AvailabilityFilteringRule
    • ZoneAvoidanceRule
    • Ribbon 负载均衡策略源码
      • RandomRule源码
      • RoundRobinRule源码
      • BestAvailableRule源码
      • RetryRule源码

        通过本文你可以学习到:

        1. 常见的7种负载均衡策略思想
        2. 自旋锁的使用方式
        3. 防御性编程

        负载均衡策略

        RandomRule

        该策略会从当前可用的服务节点中,随机挑选一个节点访问,使用了yield+自旋的方式做重试,还采用了严格的防御性编程。

        RoundRobinRule

        该策略会从一个节点一步一步地向后选取节点,如下图所示:

        Ribbon 负载均衡策略 —— 图解、源码级解析,在这里插入图片描述,第1张

        在多线程环境下,两个请求同时访问这个Rule也不会读取到相同节点:这靠的是RandomRobinRule底层的自旋锁+CAS的同步操作。

        CAS+自旋锁这套组合技是高并发下最廉价的线程安全手段,因为这套操作不需要锁定系统资源。但缺点是,自旋锁如果迟迟不能释放,将会带来CPU资源的浪费,因为自旋本身并不会执行任何业务逻辑,而是单纯的使CPU空转。所以通常情况下会对自旋锁的旋转次数做一个限制,比如JDK中synchronize底层的锁升级策略,就对自旋次数做了动态调整。

        while (true) {
            // cas操作
            if (cas(expected, update)) {
                // 业务逻辑代码
                // break或退出return
            }
        }
        

        Eureka为了防止服务下线被重复调用,就使用AtomicBoolean的CAS方法做同步控制;

        奈飞提供的SpringCloud组件有特别多用到CAS的地方,感兴趣的小伙伴们可以发现一下

        RetryRule

        RetryRule是一个类似装饰器模式的规则,装饰器相当于一层套一层的套娃,每一层都会加上一层独特的功能。

        经典的装饰器模式示意图:

        Ribbon 负载均衡策略 —— 图解、源码级解析,在这里插入图片描述,第2张

        借助上面的思路,RetryRule就是给其他负载均衡策略加上重试功能。在RetryRule里还藏着一个subRule,这才是真正被执行的负载均衡策略,RetryRule正是要为它添加重试功能(如果初始化时没指定subRule,将默认使用RoundRibinRule)。

        WeightedResponseTimeRule

        这个规则继承自RoundRibbonRule,他会根据服务节点的响应时间计算权重,响应时间越长权重就越低,响应越快则权重越高,权重的高低决定了机器被选中概率的高低。也就是说,响应时间越小的机器,被选中的概率越大。

        服务器刚启动的时候,对各个服务节点采样不足,因此会采用轮询策略,当积累到一定的样本时候,才会切换到WeightedResponseTimeRule模式。

        BestAvailableRule

        过滤掉故障服务以后,它会基于过去30分钟的统计结果选取当前并发量最小的服务节点作为目标地址。如果统计结果尚未生成,则采用轮询的方式选定节点。

        AvailabilityFilteringRule

        这个规则底层依赖RandomRobinRule来选取节点,但必须要满足它的最低要求的节点才会被选中。如果节点满足了要求,无论其响应时间或者当前并发量是什么,都会被选中。

        每次AvailabilityFilteringRule都会请求RobinRule挑选一个节点,然后对这个节点做以下两步检查:

        1. 是否处于熔断状态
        2. 节点当前的请求连接数超过阈值,超过了则表示节点目前太忙

        如果被选中的server挂了,那么AFR会自动重试(最多10次),让RobinRule重新选择一个服务节点

        ZoneAvoidanceRule

        这个过滤器包含了组合过滤条件,分别是Zone级别和可用性级别。

        Ribbon 负载均衡策略 —— 图解、源码级解析,在这里插入图片描述,第3张

        • Zone Filter: Zone可以理解为机房所属的大区域,这里会对这个Zone下面所有的服务节点进行健康情况过滤。

        • 可用性过滤: 这里和AvailabilityFilteringRule的验证过程很像,会过滤掉当前并发量较大,或者处于熔断状态的服务节点。

          Ribbon 负载均衡策略源码

          RandomRule源码

          先从RandomRule看起,核心的方法是:

          Ribbon 负载均衡策略 —— 图解、源码级解析,请添加图片描述,第4张

          public Server choose(ILoadBalancer lb, Object key) {
              if (lb == null) {
                  return null;
              }
              Server server = null;
              while (server == null) {
                  if (Thread.interrupted()) {
                      return null;
                  }
                  List upList = lb.getReachableServers();
                  List allList = lb.getAllServers();
                  int serverCount = allList.size();
                  if (serverCount == 0) {
                      /*
                       * No servers. End regardless of pass, because subsequent passes
                       * only get more restrictive.
                       */
                      return null;
                  }
                  int index = chooseRandomInt(serverCount);
                  server = upList.get(index);
                  if (server == null) {
                      Thread.yield();
                      continue;
                  }
                  if (server.isAlive()) {
                      return (server);
                  }
                  server = null;
                  Thread.yield();
              }
              return server;
          }
          

          在RandomRule里方法的入参key没有用到,所以可以先暂时忽略

          while循环逻辑是如果server为空,则找到一个可用的server

          if (Thread.interrupted()) {
              return null;
          }
          

          如果线程暂停了,则直接返回空(防御性编程)

          List upList = lb.getReachableServers();
          List allList = lb.getAllServers();
          

          allList存储的是所有的服务,upList存储的是可运行状态的服务

          int serverCount = allList.size();
          if (serverCount == 0) {
              return null;
          }
          

          服务中心上没有server注册,则返回空

          int index = chooseRandomInt(serverCount);
          server = upList.get(index);
          

          随机选择一个server

          其中,chooseRandomInt的逻辑如下:

          protected int chooseRandomInt(int serverCount) {
              return ThreadLocalRandom.current().nextInt(serverCount);
          }
          

          返回0到serverCount中间的任意一个值

          java中的随机是可以预测到结果的,真随机数一般会掺杂一些不可预测的数据,比如当前cpu的温度


          回到RandomRule的choose方法:

          如果发现随机选择的server为空表示此时serverList正在被修正,此时让出线程资源,进行下一次循环,对应最开始的防御性编程

          if (server == null) {
              Thread.yield();
              continue;
          }
          
          if (server.isAlive()) {
              return (server);
          }
          

          如果server可用直接return

          server = null;
          Thread.yield();
          

          如果不可用则server置为空,下一次循环会选一个新的,最后让出资源。

          所以该方法每次进入下一次循环时都会让出线程。


          RoundRobinRule源码

          接下来看RoundRobinRule

          public Server choose(ILoadBalancer lb, Object key) {
              if (lb == null) {
                  log.warn("no load balancer");
                  return null;
              }
              Server server = null;
              int count = 0;
              while (server == null && count++ < 10) {
                  List reachableServers = lb.getReachableServers();
                  List allServers = lb.getAllServers();
                  int upCount = reachableServers.size();
                  int serverCount = allServers.size();
                  if ((upCount == 0) || (serverCount == 0)) {
                      log.warn("No up servers available from load balancer: " + lb);
                      return null;
                  }
                  int nextServerIndex = incrementAndGetModulo(serverCount);
                  server = allServers.get(nextServerIndex);
                  if (server == null) {
                      Thread.yield();
                      continue;
                  }
                  if (server.isAlive() && (server.isReadyToServe())) {
                      return (server);
                  }
                  server = null;
              }
              if (count >= 10) {
                  log.warn("No available alive servers after 10 tries from load balancer: "
                          + lb);
              }
              return server;
          }
          

          while循环里面有一个计数器,如果重试10次依然没有结果返回就不重试了。

          List reachableServers = lb.getReachableServers();
          List allServers = lb.getAllServers();
          int upCount = reachableServers.size();
          int serverCount = allServers.size();
          

          reachableServers就是up状态的server

          if ((upCount == 0) || (serverCount == 0)) {
              log.warn("No up servers available from load balancer: " + lb);
              return null;
          }
          

          没有可用服务器则返回空

          int nextServerIndex = incrementAndGetModulo(serverCount);
          server = allServers.get(nextServerIndex);
          

          选择哪个下标的server,进入incrementAndGetModulo方法

          private int incrementAndGetModulo(int modulo) {
              for (;;) {
                  int current = nextServerCyclicCounter.get();
                  int next = (current + 1) % modulo;
                  if (nextServerCyclicCounter.compareAndSet(current, next))
                      return next;
              }
          }
          

          使用了自旋锁,nextServerCyclicCounter是一个线程安全的数字。

          if (server == null) {
              Thread.yield();
              continue;
          }
          

          如果获取到的server为空则让出资源,继续下一次循环

          if (server.isAlive() && (server.isReadyToServe())) {
              return (server);
          }
          

          server是正常的则返回

          server = null;
          

          最后没有让出线程资源,因为重试10次后就退出循环了


          BestAvailableRule源码

          接下来看BestAvailableRule

          @Override
          public Server choose(Object key) {
              if (loadBalancerStats == null) {
                  return super.choose(key);
              }
              List serverList = getLoadBalancer().getAllServers();
              int minimalConcurrentConnections = Integer.MAX_VALUE;
              long currentTime = System.currentTimeMillis();
              Server chosen = null;
              for (Server server: serverList) {
                  ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
                  if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                      int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                      if (concurrentConnections < minimalConcurrentConnections) {
                          minimalConcurrentConnections = concurrentConnections;
                          chosen = server;
                      }
                  }
              }
              if (chosen == null) {
                  return super.choose(key);
              } else {
                  return chosen;
              }
          }
          

          if (loadBalancerStats == null) {
              return super.choose(key);
          }
          

          如果loadBalancerStats为空则调用父类的choose方法,父类方法直接委托给RoundRobinRule来完成choose。

          for循环里先从loadBalancerStats中获取到当前服务的状态

          ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
          
          public ServerStats getSingleServerStat(Server server) {
              return getServerStats(server);
          }
          
          protected ServerStats getServerStats(Server server) {
              try {
                  return serverStatsCache.get(server);
              } catch (ExecutionException e) {
                  ServerStats stats = createServerStats(server);
                  serverStatsCache.asMap().putIfAbsent(server, stats);
                  return serverStatsCache.asMap().get(server);
              }
          }
          

          这里是从缓存中获取server的stats,如果获取失败则默认创建一个stats并添加到缓存中,然后从cache中再获取一次。

          随后判断是否处于熔断状态

          if (!serverStats.isCircuitBreakerTripped(currentTime)) {...}
          
          public boolean isCircuitBreakerTripped(long currentTime) {
              long circuitBreakerTimeout = getCircuitBreakerTimeout();
              if (circuitBreakerTimeout <= 0) {
                  return false;
              }
              return circuitBreakerTimeout > currentTime;
          }
          

          首先获得熔断的TimeOut(表示截止到未来某个时间熔断终止),如果大于当前时间说明处于熔断状态。

          熔断的TimeOut由下面方法计算得到:

          private long getCircuitBreakerTimeout() {
              long blackOutPeriod = getCircuitBreakerBlackoutPeriod();
              if (blackOutPeriod <= 0) {
                  return 0;
              }
              return lastConnectionFailedTimestamp + blackOutPeriod;
          }
          

          返回上一次连接失败的时间戳 + blackOutPeriod

          其中又调用了

          private long getCircuitBreakerBlackoutPeriod() {
              int failureCount = successiveConnectionFailureCount.get();
              int threshold = connectionFailureThreshold.get();
              if (failureCount < threshold) {
                  return 0;
              }
              int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);
              int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
              if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {
                  blackOutSeconds = maxCircuitTrippedTimeout.get();
              }
              return blackOutSeconds * 1000L;
          }
          

          failureCount是失败的个数,从一个计数器里获得,阈值从一个缓存的属性中获得,之后计算两个的差值,再根据缓存中的一些属性计算最终的秒数,最后乘以1000返回。


          回到BestAvailableRule的choose方法,只有不处于熔断状态才能继续走后面的流程

          if (concurrentConnections < minimalConcurrentConnections) {
              minimalConcurrentConnections = concurrentConnections;
              chosen = server;
          }
          

          选出连接数最小的服务器

          if (chosen == null) {
              return super.choose(key);
          } else {
              return chosen;
          }
          

          最后返回

          核心是找到一个最轻松的服务器。


          RetryRule源码

          查看RetryRule源码:

          public Server choose(ILoadBalancer lb, Object key) {
             long requestTime = System.currentTimeMillis();
             long deadline = requestTime + maxRetryMillis;
             Server answer = null;
             answer = subRule.choose(key);
             if (((answer == null) || (!answer.isAlive()))
                   && (System.currentTimeMillis() < deadline)) {
                InterruptTask task = new InterruptTask(deadline
                      - System.currentTimeMillis());
                while (!Thread.interrupted()) {
                   answer = subRule.choose(key);
                   if (((answer == null) || (!answer.isAlive()))
                         && (System.currentTimeMillis() < deadline)) {
                      /* pause and retry hoping it's transient */
                      Thread.yield();
                   } else {
                      break;
                   }
                }
                task.cancel();
             }
             if ((answer == null) || (!answer.isAlive())) {
                return null;
             } else {
                return answer;
             }
          }
          
          long requestTime = System.currentTimeMillis();
          long deadline = requestTime + maxRetryMillis;
          

          先记录当前时间和deadline,在截止时间之前可以一直重试。

          answer = subRule.choose(key);
          

          方法里面是由subRule来实现具体的负载均衡逻辑,这里默认类型是RoundRobinRule

          如果选到的是空或者选到的不是up的,且时间在ddl之前则进入重试逻辑:

          while (!Thread.interrupted()) {
             answer = subRule.choose(key);
             if (((answer == null) || (!answer.isAlive()))
                   && (System.currentTimeMillis() < deadline)) {
                /* pause and retry hoping it's transient */
                Thread.yield();
             } else {
                break;
             }
          }
          

          如果线程中断了就中断重试。之后重新选择服务器,如果又没选到则把资源让出去,下一次while循环再选,在while循环之前会起一个任务

          InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());
          

          到了截止时间之后,程序会中断重试的流程

          task.cancel();
          

          最后返回

          if ((answer == null) || (!answer.isAlive())) {
             return null;
          } else {
             return answer;
          }