Springcloud 负载均衡是怎么工作的
发布日期:2023-05-11 22:10浏览次数:
今天我们来看一下Springcloud的LoadBalance负载均衡是怎么工作的
通过这个理解我们可以自定义负载均衡算法策略
先介绍一下核心的接口
ReactiveLoadBalancer表示负载均衡算法
LoadBalancerClient表示负载均衡客户端
它的默认实现是BlockingLoadBalancerClient
我们来看它的实现细节
这里的execute是去执行request.apply不是本章节的范畴我们忽略它,继续看choose
loadBalancerClientFactory.getInstance(serviceId);从spring context中获取ReactiveLoadBalancer
loadBalancer.choose(request)再使用ReactiveLoadBalancer选择服务实例
因此如果想要自定义负载均衡,一般只需要自定义ReactiveLoadBalancer
它的继承结构
默认有2个负载均衡算法,随机和轮询
我们来看轮询RoundRobinLoadBalancer
它在LoadBalancerClientConfiguration中自动配置
看它的choose(Request request)实现
这里的ServiceInstanceListSupplier表示 服务列表的响应式 提供接口,使用该接口来获取实例列表的
它有多个实现
默认的链路是CachingServiceInstanceListSupplier –> DiscoveryClientServiceInstanceListSupplier -> DiscoveryClient.getInstances(String serviceId)
而CachingServiceInstanceListSupplier中的CacheManager默认实际是不会使用到缓存的,最终将使用DiscoveryClient来获取实例列表
DiscoveryClient的工作过程我们在服务注册与发现中已经看过
https://blog.csdn.net/a5192041/article/details/128318412
然后再使用轮询算法选择出最终的实例
下面我们看一下如何自定义负载均衡,例如根据流量特征进行负载均衡,这在灰度发布、租户隔离、开发环境流量隔离等场景很有用
package io.github.icodegarden.commons.springboot.loadbalancer;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultRequestContext;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.RequestData;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
import io.github.icodegarden.commons.lang.util.JsonUtils;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import reactor.core.publisher.Mono;
/**
* 负载均衡算法:
* 流量有X-FlowTag-Required的,必须完全匹配有此tag的实例
* 流量有X-FlowTag-First的,优先选择有此tag的服务,其次选择没有任何tag的实例(优先而不是必须找到匹配的原因是:部分服务可能不发版或不需要灰度)
* 流量没有任何tag的,只选择没有任何tag的实例
*
*
*
* 默认的instanceMetadataTagName是flow.tags, json array, ["a","b",...]
* 默认的IdentityFlowTagExtractor是从request.header中获取X-FlowTag-Required、X-FlowTag-First的值
* 默认的L2 LoadBalancer是轮询
*
* @author Fangfang.Xu
*/
public class FlowTagLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Logger log = LoggerFactory.getLogger(FlowTagLoadBalancer.class);
public static final String HTTPHEADER_FLOWTAG_REQUIRED = "X-FlowTag-Required";
public static final String HTTPHEADER_FLOWTAG_FIRST = "X-FlowTag-First";
/**
* json array, ["a","b",...]
*/
private String instanceMetadataTagName = "flow.tags";
private IdentityFlowTagExtractor identityFlowTagExtractor = new DefaultIdentityFlowTagExtractor();
private L2LoadBalancer l2LoadBalancer = new RoundRobinLoadBalancer();
private final String serviceId;
private ObjectProvider serviceInstanceListSupplierProvider;
public FlowTagLoadBalancer(ObjectProvider serviceInstanceListSupplierProvider,
String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Override
public Mono> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get().next().map(serviceInstances -> getInstanceResponse(request, supplier, serviceInstances));
}
private Response getInstanceResponse(Request request, ServiceInstanceListSupplier supplier,
List instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
IdentityFlowTag identityFlowTag = identityFlowTagExtractor.extract(request);
List instancesToChoose;
if (StringUtils.hasText(identityFlowTag.getFlowTagRequired())) {
instancesToChoose = instances.stream().filter(instance -> {
String tagValue = instance.getMetadata().get(instanceMetadataTagName);
if (!StringUtils.hasText(tagValue)) {
return false;
}
List tags = JsonUtils.deserializeArray(tagValue, String.class);
return tags.contains(identityFlowTag.getFlowTagRequired());
}).collect(Collectors.toList());
} else if (StringUtils.hasText(identityFlowTag.getFlowTagFirst())) {
instancesToChoose = instances.stream().filter(instance -> {
String tagValue = instance.getMetadata().get(instanceMetadataTagName);
if (!StringUtils.hasText(tagValue)) {
return false;
}
List tags = JsonUtils.deserializeArray(tagValue, String.class);
return tags.contains(identityFlowTag.getFlowTagFirst());
}).collect(Collectors.toList());
if (instancesToChoose.isEmpty()) {
instancesToChoose = filteredInstancesNonFlowTags(instances);
}
} else {
instancesToChoose = filteredInstancesNonFlowTags(instances);
}
return l2LoadBalancer.processInstanceResponse(supplier, instancesToChoose);
}
/**
* 没有tag的实例
*/
private List filteredInstancesNonFlowTags(List instances) {
return instances.stream().filter(instance -> {
String tagValue = instance.getMetadata().get(instanceMetadataTagName);
return !StringUtils.hasText(tagValue);
}).collect(Collectors.toList());
}
public void setInstanceMetadataTagName(String instanceMetadataTagName) {
this.instanceMetadataTagName = instanceMetadataTagName;
}
public void setIdentityFlowTagExtractor(IdentityFlowTagExtractor identityFlowTagExtractor) {
this.identityFlowTagExtractor = identityFlowTagExtractor;
}
public void setL2LoadBalancer(L2LoadBalancer l2LoadBalancer) {
this.l2LoadBalancer = l2LoadBalancer;
}
public static interface IdentityFlowTagExtractor {
/**
* @return 不为null
*/
IdentityFlowTag extract(Request request);
}
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
public static class IdentityFlowTag {
@Nullable
private String flowTagRequired;
@Nullable
private String flowTagFirst;
}
private class DefaultIdentityFlowTagExtractor implements IdentityFlowTagExtractor {
@Override
public IdentityFlowTag extract(Request request) {
Object ctx = request.getContext();
if (!(ctx instanceof DefaultRequestContext)) {
if (log.isWarnEnabled()) {
log.warn("request.context is not a DefaultRequestContext on get flow tag, context is:{}",
ctx.getClass());
}
return null;
}
DefaultRequestContext context = (DefaultRequestContext) ctx;
Object cr = context.getClientRequest();
if (!(cr instanceof RequestData)) {
if (log.isWarnEnabled()) {
log.warn("context.clientRequest is not a RequestData on get flow tag, clientRequest is:{}",
cr.getClass());
}
return null;
}
RequestData clientRequest = (RequestData) cr;
String flowTagRequired = clientRequest.getHeaders().getFirst(HTTPHEADER_FLOWTAG_REQUIRED);
String flowTagFirst = clientRequest.getHeaders().getFirst(HTTPHEADER_FLOWTAG_FIRST);
return new IdentityFlowTag(flowTagRequired, flowTagFirst);
}
}
public static interface L2LoadBalancer {
Response processInstanceResponse(ServiceInstanceListSupplier supplier,
List serviceInstances);
}
public class RoundRobinLoadBalancer implements L2LoadBalancer {
private final AtomicInteger position = new AtomicInteger(new Random().nextInt(1000));
public Response processInstanceResponse(ServiceInstanceListSupplier supplier,
List serviceInstances) {
Response serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response getInstanceResponse(List instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
// Ignore the sign bit, this allows pos to loop sequentially from 0 to
// Integer.MAX_VALUE
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}
public class RandomLoadBalancer implements L2LoadBalancer {
public Response processInstanceResponse(ServiceInstanceListSupplier supplier,
List serviceInstances) {
Response serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response getInstanceResponse(List instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
int index = ThreadLocalRandom.current().nextInt(instances.size());
ServiceInstance balanceflow瑜伽视频 instance = instances.get(index);
return new DefaultResponse(instance);
}
}
}
到此完成