您好,欢迎访问某某硅胶制品有限公司!

行业动态
您的位置: 主页 > 新闻中心 > 行业动态

Springcloud 负载均衡是怎么工作的

发布日期:2023-05-11 22:10浏览次数:
  今天我们来看一下Springcloud的LoadBalance负载均衡是怎么工作的   通过这个理解我们可以自定义负载均衡算法策略   先介绍一下核心的接口   ReactiveLoadBalancer表示负载均衡算法   LoadBalancerClient表示负载均衡客户端   它的默认实现是BlockingLoadBalancerClient   我们来看它的实现细节   这里的execute是去执行request.apply不是本章节的范畴我们忽略它,继续看choose   loadBalancerClientFactory.getInstance(serviceId);从spring context中获取ReactiveLoadBalancer   loadBalancer.choose(request)再使用ReactiveLoadBalancer选择服务实例   因此如果想要自定义负载均衡,一般只需要自定义ReactiveLoadBalancer   它的继承结构   默认有2个负载均衡算法,随机和轮询   我们来看轮询RoundRobinLoadBalancer   它在LoadBalancerClientConfiguration中自动配置   看它的choose(Request request)实现   这里的ServiceInstanceListSupplier表示 服务列表的响应式 提供接口,使用该接口来获取实例列表的   它有多个实现   默认的链路是CachingServiceInstanceListSupplier –> DiscoveryClientServiceInstanceListSupplier -> DiscoveryClient.getInstances(String serviceId)   而CachingServiceInstanceListSupplier中的CacheManager默认实际是不会使用到缓存的,最终将使用DiscoveryClient来获取实例列表   DiscoveryClient的工作过程我们在服务注册与发现中已经看过   https://blog.csdn.net/a5192041/article/details/128318412   然后再使用轮询算法选择出最终的实例   下面我们看一下如何自定义负载均衡,例如根据流量特征进行负载均衡,这在灰度发布、租户隔离、开发环境流量隔离等场景很有用   package io.github.icodegarden.commons.springboot.loadbalancer;   import java.util.List;   import java.util.Random;   import java.util.concurrent.ThreadLocalRandom;   import java.util.concurrent.atomic.AtomicInteger;   import java.util.stream.Collectors;   import org.slf4j.Logger;   import org.slf4j.LoggerFactory;   import org.springframework.beans.factory.ObjectProvider;   import org.springframework.cloud.client.ServiceInstance;   import org.springframework.cloud.client.loadbalancer.DefaultRequestContext;   import org.springframework.cloud.client.loadbalancer.DefaultResponse;   import org.springframework.cloud.client.loadbalancer.EmptyResponse;   import org.springframework.cloud.client.loadbalancer.Request;   import org.springframework.cloud.client.loadbalancer.RequestData;   import org.springframework.cloud.client.loadbalancer.Response;   import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;   import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;   import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;   import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;   import org.springframework.lang.Nullable;   import org.springframework.util.StringUtils;   import io.github.icodegarden.commons.lang.util.JsonUtils;   import lombok.AllArgsConstructor;   import lombok.Getter;   import lombok.NoArgsConstructor;   import lombok.Setter;   import lombok.ToString;   import reactor.core.publisher.Mono;   /**   * 负载均衡算法:
  * 流量有X-FlowTag-Required的,必须完全匹配有此tag的实例
  * 流量有X-FlowTag-First的,优先选择有此tag的服务,其次选择没有任何tag的实例(优先而不是必须找到匹配的原因是:部分服务可能不发版或不需要灰度)
  * 流量没有任何tag的,只选择没有任何tag的实例
  *   *
  *   * 默认的instanceMetadataTagName是flow.tags, json array, ["a","b",...]
  * 默认的IdentityFlowTagExtractor是从request.header中获取X-FlowTag-Required、X-FlowTag-First的值
  * 默认的L2 LoadBalancer是轮询
  *   * @author Fangfang.Xu   */   public class FlowTagLoadBalancer implements ReactorServiceInstanceLoadBalancer {   private static final Logger log = LoggerFactory.getLogger(FlowTagLoadBalancer.class);   public static final String HTTPHEADER_FLOWTAG_REQUIRED = "X-FlowTag-Required";   public static final String HTTPHEADER_FLOWTAG_FIRST = "X-FlowTag-First";   /**   * json array, ["a","b",...]   */   private String instanceMetadataTagName = "flow.tags";   private IdentityFlowTagExtractor identityFlowTagExtractor = new DefaultIdentityFlowTagExtractor();   private L2LoadBalancer l2LoadBalancer = new RoundRobinLoadBalancer();   private final String serviceId;   private ObjectProvider serviceInstanceListSupplierProvider;   public FlowTagLoadBalancer(ObjectProvider serviceInstanceListSupplierProvider,   String serviceId) {   this.serviceId = serviceId;   this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;   }   @Override   public Mono> choose(Request request) {   ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider   .getIfAvailable(NoopServiceInstanceListSupplier::new);   return supplier.get().next().map(serviceInstances -> getInstanceResponse(request, supplier, serviceInstances));   }   private Response getInstanceResponse(Request request, ServiceInstanceListSupplier supplier,   List instances) {   if (instances.isEmpty()) {   if (log.isWarnEnabled()) {   log.warn("No servers available for service: " + serviceId);   }   return new EmptyResponse();   }   IdentityFlowTag identityFlowTag = identityFlowTagExtractor.extract(request);   List instancesToChoose;   if (StringUtils.hasText(identityFlowTag.getFlowTagRequired())) {   instancesToChoose = instances.stream().filter(instance -> {   String tagValue = instance.getMetadata().get(instanceMetadataTagName);   if (!StringUtils.hasText(tagValue)) {   return false;   }   List tags = JsonUtils.deserializeArray(tagValue, String.class);   return tags.contains(identityFlowTag.getFlowTagRequired());   }).collect(Collectors.toList());   } else if (StringUtils.hasText(identityFlowTag.getFlowTagFirst())) {   instancesToChoose = instances.stream().filter(instance -> {   String tagValue = instance.getMetadata().get(instanceMetadataTagName);   if (!StringUtils.hasText(tagValue)) {   return false;   }   List tags = JsonUtils.deserializeArray(tagValue, String.class);   return tags.contains(identityFlowTag.getFlowTagFirst());   }).collect(Collectors.toList());   if (instancesToChoose.isEmpty()) {   instancesToChoose = filteredInstancesNonFlowTags(instances);   }   } else {   instancesToChoose = filteredInstancesNonFlowTags(instances);   }   return l2LoadBalancer.processInstanceResponse(supplier, instancesToChoose);   }   /**   * 没有tag的实例   */   private List filteredInstancesNonFlowTags(List instances) {   return instances.stream().filter(instance -> {   String tagValue = instance.getMetadata().get(instanceMetadataTagName);   return !StringUtils.hasText(tagValue);   }).collect(Collectors.toList());   }   public void setInstanceMetadataTagName(String instanceMetadataTagName) {   this.instanceMetadataTagName = instanceMetadataTagName;   }   public void setIdentityFlowTagExtractor(IdentityFlowTagExtractor identityFlowTagExtractor) {   this.identityFlowTagExtractor = identityFlowTagExtractor;   }   public void setL2LoadBalancer(L2LoadBalancer l2LoadBalancer) {   this.l2LoadBalancer = l2LoadBalancer;   }   public static interface IdentityFlowTagExtractor {   /**   * @return 不为null   */   IdentityFlowTag extract(Request request);   }   @NoArgsConstructor   @AllArgsConstructor   @Getter   @Setter   @ToString   public static class IdentityFlowTag {   @Nullable   private String flowTagRequired;   @Nullable   private String flowTagFirst;   }   private class DefaultIdentityFlowTagExtractor implements IdentityFlowTagExtractor {   @Override   public IdentityFlowTag extract(Request request) {   Object ctx = request.getContext();   if (!(ctx instanceof DefaultRequestContext)) {   if (log.isWarnEnabled()) {   log.warn("request.context is not a DefaultRequestContext on get flow tag, context is:{}",   ctx.getClass());   }   return null;   }   DefaultRequestContext context = (DefaultRequestContext) ctx;   Object cr = context.getClientRequest();   if (!(cr instanceof RequestData)) {   if (log.isWarnEnabled()) {   log.warn("context.clientRequest is not a RequestData on get flow tag, clientRequest is:{}",   cr.getClass());   }   return null;   }   RequestData clientRequest = (RequestData) cr;   String flowTagRequired = clientRequest.getHeaders().getFirst(HTTPHEADER_FLOWTAG_REQUIRED);   String flowTagFirst = clientRequest.getHeaders().getFirst(HTTPHEADER_FLOWTAG_FIRST);   return new IdentityFlowTag(flowTagRequired, flowTagFirst);   }   }   public static interface L2LoadBalancer {   Response processInstanceResponse(ServiceInstanceListSupplier supplier,   List serviceInstances);   }   public class RoundRobinLoadBalancer implements L2LoadBalancer {   private final AtomicInteger position = new AtomicInteger(new Random().nextInt(1000));   public Response processInstanceResponse(ServiceInstanceListSupplier supplier,   List serviceInstances) {   Response serviceInstanceResponse = getInstanceResponse(serviceInstances);   if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {   ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());   }   return serviceInstanceResponse;   }   private Response getInstanceResponse(List instances) {   if (instances.isEmpty()) {   if (log.isWarnEnabled()) {   log.warn("No servers available for service: " + serviceId);   }   return new EmptyResponse();   }   // Ignore the sign bit, this allows pos to loop sequentially from 0 to   // Integer.MAX_VALUE   int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;   ServiceInstance instance = instances.get(pos % instances.size());   return new DefaultResponse(instance);   }   }   public class RandomLoadBalancer implements L2LoadBalancer {   public Response processInstanceResponse(ServiceInstanceListSupplier supplier,   List serviceInstances) {   Response serviceInstanceResponse = getInstanceResponse(serviceInstances);   if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {   ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());   }   return serviceInstanceResponse;   }   private Response getInstanceResponse(List instances) {   if (instances.isEmpty()) {   if (log.isWarnEnabled()) {   log.warn("No servers available for service: " + serviceId);   }   return new EmptyResponse();   }   int index = ThreadLocalRandom.current().nextInt(instances.size());   ServiceInstance balanceflow瑜伽视频 instance = instances.get(index);   return new DefaultResponse(instance);   }   }   }   到此完成
标签: In

Copyright © 2020-2023 南宫娱乐最新官网有限公司 版权所有 备案号:陕ICP备19009305号-1

020-88888888