十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
本文转载自微信公众号「运维开发故事」,作者老郑。转载本文请联系运维开发故事公众号。

创新互联建站服务项目包括岳普湖网站建设、岳普湖网站制作、岳普湖网页制作以及岳普湖网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,岳普湖网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到岳普湖省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
Nacos 提供了开放 API 可通过 /nacos/v1/ns/instance/list 获取服务列表。如果我们采用 spring-cloud 方式去获取服务,最终会通过 Nacos Client + loadbalancer 的方式进行客户端负载均衡。
Ribbon 简介
Spring Cloud Ribbon 是 Netflix Ribbon 实现的一套客户端负载均衡工具 简单的说,Ribbon 是 Netflix 发布的开源项目,主要功能是提供客户端的复杂算法和服务调用。 Ribbon 客户端组件提供一系列完善的配置项如超时、重试等。简单的说,就是配置文件中列出 load Balancer (简称 LB)后面所有的机器,Ribbon 会自动的帮助你基于某种规则(如简单轮询,随机链接等)去链接这些机器。我们很容易使用 Ribbon 自定义的负载均衡算法。
Ribbon 使用
首先需要定义 RestTemplate 使用 Ribbon 策略;
- @Configuration
 - public class RestTemplateConfig {
 - @LoadBalanced
 - @Bean
 - public RestTemplate restTemplate() {
 - return new RestTemplate();
 - }
 - }
 
本地使用 RestTemplate 调用远程接口;
- @Autowired
 - private RestTemplate restTemplate;
 - @RequestMapping(value = "/echo/{id}", method = RequestMethod.GET)
 - public String echo(@PathVariable Long id) {
 - return restTemplate.getForObject("http://member-service/member/get/" + id, String.class);
 - }
 
Ribbon 源码分析
RestTemplate 继承 InterceptingHttpAccessor 通过 interceptors 字段接受 HttpRequestInterceptor 请求拦截器。对于 Ribbion 初始化类是 RibbonAutoConfiguration 中的, 它在 spring-cloud-netflix-ribbon 中定义。但是它在初始化之前,又需要加载 RibbonAutoConfiguration 配置,它是在 spring-cloud-common 中。具体的代码如下:
- @Configuration(proxyBeanMethods = false)
 - // 工程中一定存在 RestTemplate 类
 - @ConditionalOnClass(RestTemplate.class)
 - // 容器中一定存在 LoadBalancerClient 类 Bean 实例
 - @ConditionalOnBean(LoadBalancerClient.class)
 - @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
 - public class LoadBalancerAutoConfiguration {
 - // 获取 Spring 容器中所有的 RestTemplate 实例
 - @LoadBalanced
 - @Autowired(required = false)
 - private List
 restTemplates = Collections.emptyList(); - // 获取 Spring 容器中 LoadBalancerRequestTransformer 实例
 - @Autowired(required = false)
 - private List
 transformers = Collections.emptyList(); - // 在 Bean 初始化完成后会调用 afterSingletonsInstantiated 方法
 - // 这里是一个 lambda 表达式方式的实现, 主要是为 restTemplate 实例设置 RestTemplateCustomizer
 - @Bean
 - public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
 - final ObjectProvider
 > restTemplateCustomizers) {
- return () -> restTemplateCustomizers.ifAvailable(customizers -> {
 - for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
 - for (RestTemplateCustomizer customizer : customizers) {
 - customizer.customize(restTemplate);
 - }
 - }
 - });
 - }
 - // LoadBalancerRequestFactory 工厂类
 - // 主要是用来提供 LoadBalancerClient 实例和 LoadBalancerRequestTransformer
 - @Bean
 - @ConditionalOnMissingBean
 - public LoadBalancerRequestFactory loadBalancerRequestFactory(
 - LoadBalancerClient loadBalancerClient) {
 - return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
 - }
 - // LoadBalancerInterceptor 拦截器
 - @Configuration(proxyBeanMethods = false)
 - @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
 - static class LoadBalancerInterceptorConfig {
 - // 创建默认的拦截器 LoadBalancerInterceptor 的实例
 - @Bean
 - public LoadBalancerInterceptor ribbonInterceptor(
 - LoadBalancerClient loadBalancerClient,
 - LoadBalancerRequestFactory requestFactory) {
 - return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
 - }
 - // 如果没有 RestTemplateCustomizer 实例才会创建
 - // 这里就就会为咱们所有的 restTemplate 实例添加 loadBalancerInterceptor 拦截器
 - @Bean
 - @ConditionalOnMissingBean
 - public RestTemplateCustomizer restTemplateCustomizer(
 - final LoadBalancerInterceptor loadBalancerInterceptor) {
 - return restTemplate -> {
 - List
 list = new ArrayList<>( - restTemplate.getInterceptors());
 - list.add(loadBalancerInterceptor);
 - restTemplate.setInterceptors(list);
 - };
 - }
 - }
 - // ...
 - }
 
针对下面的代码我们可以总结一下:
如果需要使用负载均衡,工程下面必须要有 RestTemplate 类, 然后Spring 容器中要有 LoadBalancerClient 的实例。
LoadBalancerClient 在 spring-cloud-netflix-ribbon 中只有一个实现类: RibbonLoadBalancerClient
利用 Spring 的 SmartInitializingSingleton 拓展点,在 restTemplateCustomizer() 中为所有的 RestTemplate 添加 LoadBalancerInterceptor 拦截器
其实 LoadBalancer 的本质就是通过拦截器。利用 RestTemplate 的拓展点来实现请求服务的负载均衡。
LoadBalancerInterceptor
LoadBalancerInterceptor 拦截器会将请求交给 LoadBalancerClient 去处理,首先会选择一个 ILoadBalancer 的实现来处理获取和选择服务,然后通过 serviceName 和负载均衡算法去选择 Server 对象。最后执行请求。
- public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
 - // 负载均衡
 - private LoadBalancerClient loadBalancer;
 - // 构建请求
 - private LoadBalancerRequestFactory requestFactory;
 - // ...
 - @Override
 - public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
 - final ClientHttpRequestExecution execution) throws IOException {
 - final URI originalUri = request.getURI();
 - String serviceName = originalUri.getHost();
 - return this.loadBalancer.execute(serviceName,
 - this.requestFactory.createRequest(request, body, execution));
 - }
 - }
 
RibbonLoadBalancerClient
我们通过跟踪 this.loadBalancer.execute 代码发现。最终所有的请求都交由 RibbonLoadBalancerClient 去处理。它实现了。LoadBalancerClient 接口, 代码如下:
- public interface ServiceInstanceChooser {
 - // 通过 serviceId 选择具体的服务实例
 - ServiceInstance choose(String serviceId);
 - }
 - public interface LoadBalancerClient extends ServiceInstanceChooser {
 T execute(String serviceId, LoadBalancerRequest request) throws IOException; T execute(String serviceId, ServiceInstance serviceInstance, - LoadBalancerRequest
 request) throws IOException; - // 将服务实例信息替换还具体的 IP 信息
 - URI reconstructURI(ServiceInstance instance, URI original);
 - }
 
我们先来分析 RibbonLoadBalancerClient 的 choose 方法
- @Override
 - public ServiceInstance choose(String serviceId) {
 - return choose(serviceId, null);
 - }
 - // 通过服务名选择具体的服务实例
 - public ServiceInstance choose(String serviceId, Object hint) {
 - Server server = getServer(getLoadBalancer(serviceId), hint);
 - if (server == null) {
 - return null;
 - }
 - return new RibbonServer(serviceId, server, isSecure(server, serviceId),
 - serverIntrospector(serviceId).getMetadata(server));
 - }
 - // 通过服务名选择一个负载均衡器, 默认是 `ZoneAwareLoadBalancer`
 - protected ILoadBalancer getLoadBalancer(String serviceId) {
 - return this.clientFactory.getLoadBalancer(serviceId);
 - }
 - // 获取服务
 - protected Server getServer(ILoadBalancer loadBalancer) {
 - return getServer(loadBalancer, null);
 - }
 - protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
 - if (loadBalancer == null) {
 - return null;
 - }
 - // Use 'default' on a null hint, or just pass it on?
 - return loadBalancer.chooseServer(hint != null ? hint : "default");
 - }
 
LoadBalancerInterceptor 执行的时候是直接委托执行的 loadBalancer.execute() 这个方法:
- // LoadBalancerRequest 是通过 LoadBalancerRequestFactory.createRequest(request, body, execution) 创建
 - // 它实现 LoadBalancerRequest 接口是用的一个匿名内部类,泛型类型是ClientHttpResponse
 - // 因为最终执行的显然还是执行器:ClientHttpRequestExecution.execute()
 - @Override
 - public
 T execute(String serviceId, LoadBalancerRequest request) throws IOException { - return execute(serviceId, request, null);
 - }
 - public
 T execute(String serviceId, LoadBalancerRequest request, Object hint) throws IOException { - // 拿到负载均衡器,然后拿到一个serverInstance实例
 - ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
 - Server server = getServer(loadBalancer, hint);
 - if (server == null) { // 若没找到就直接抛出异常。这里使用的是IllegalStateException这个异常
 - throw new IllegalStateException("No instances available for " + serviceId);
 - }
 - // 把Server适配为RibbonServer isSecure:客户端是否安全
 - // serverIntrospector内省 参考配置文件:ServerIntrospectorProperties
 - RibbonServer ribbonServer = new RibbonServer(serviceId, server,
 - isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));
 - //调用本类的重载接口方法
 - return execute(serviceId, ribbonServer, request);
 - }
 - // 它的参数是 ServiceInstance --> 已经确定了唯一的Server实例
 - @Override
 - public
 T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException { - // 拿到 Server,RibbonServer 是 execute 时的唯一实现
 - Server server = null;
 - if (serviceInstance instanceof RibbonServer) {
 - server = ((RibbonServer) serviceInstance).getServer();
 - }
 - if (server == null) {
 - throw new IllegalStateException("No instances available for " + serviceId);
 - }
 - // 执行的上下文是和serviceId绑定的
 - RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
 - ...
 - // 真正的向server发送请求,得到返回值
 - // 因为有拦截器,所以这里肯定说执行的是InterceptingRequestExecution.execute()方法
 - // so会调用ServiceRequestWrapper.getURI(),从而就会调用reconstructURI()方法
 - T returnVal = request.apply(serviceInstance);
 - return returnVal;
 - ... // 异常处理
 - }
 
returnVal 是一个 ClientHttpResponse,最后交给 handleResponse()方法来处理异常情况(若存在的话),若无异常就交给提取器提值:responseExtractor.extractData(response),这样整个请求就算全部完成了。
ZoneAwareLoadBalancer
负载均衡器 ZoneAwareLoadBalancer 的类图结构如下图所示。它 DynamicServerListLoadBalancer 它的父类, 核心方法 重置和初始化:restOfInit(clientConfig) 更新服务列表:updateListOfServers(); 这个方需要调用到 ServerList.getUpdatedListOfServers() 这里就会调用到具体的注册中心实现,以 Nacos 为例他的实现就是 NacosServerList#getUpdatedListOfServers();
Ribbon 总结
针对 @LoadBalanced 下的 RestTemplate 的使用,我总结如下:
最后,需要特别指出的是:标注有@LoadBalanced 的 RestTemplate 只能填写 serviceId 而不能再写 IP地址/域名去发送请求了, 若你的项目中两种 case 都有需要,需要定义多个 RestTemplate 分别应对不同的使用场景
客户端查询
如果我们使用默认的 Nacos 客户端,那么走的就是 NacosServerList#getUpdatedListOfServers();接口来查询服务列表。
- public class NacosServerList extends AbstractServerList
 { - private NacosDiscoveryProperties discoveryProperties;
 - @Override
 - public List
 getUpdatedListOfServers() { - return getServers();
 - }
 - private List
 getServers() { - try {
 - String group = discoveryProperties.getGroup();
 - // discoveryProperties.namingServiceInstance()
 - // 最终通过反射获取 com.alibaba.nacos.client.naming.NacosNamingService 实例
 - List
 instances = discoveryProperties.namingServiceInstance() - .selectInstances(serviceId, group, true);
 - return instancesToServerList(instances);
 - }
 - catch (Exception e) {
 - throw new IllegalStateException(
 - "Can not get service instances from nacos, serviceId=" + serviceId,
 - e);
 - }
 - }
 - }
 
然后调用 selectInstances 方法
- @Override
 - public List
 selectInstances(String serviceName, String groupName, List clusters, boolean healthy, - boolean subscribe) throws NacosException {
 - ServiceInfo serviceInfo;
 - // subscribe 默认传的是 true
 - if (subscribe) {
 - serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
 - StringUtils.join(clusters, ","));
 - } else {
 - serviceInfo = hostReactor
 - .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
 - StringUtils.join(clusters, ","));
 - }
 - return selectInstances(serviceInfo, healthy);
 - }
 
其实核心的逻辑在 hostReactor.getServiceInfo 在查询服务信息里面会把当前的 serviceName、 clusters 转换为 key, 然后通过 getServiceInfo0 方法查询服务信息这里主要是查询的是本地的数据。
如果 null == serviceObj 会在 updateServiceNow 里面去调用 /instance/list接口查询服务信息
- public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
 - NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
 - String key = ServiceInfo.getKey(serviceName, clusters);
 - if (failoverReactor.isFailoverSwitch()) {
 - return failoverReactor.getService(key);
 - }
 - ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
 - if (null == serviceObj) {
 - serviceObj = new ServiceInfo(serviceName, clusters);
 - serviceInfoMap.put(serviceObj.getKey(), serviceObj);
 - updatingMap.put(serviceName, new Object());
 - updateServiceNow(serviceName, clusters);
 - updatingMap.remove(serviceName);
 - } else if (updatingMap.containsKey(serviceName)) {
 - // UPDATE_HOLD_INTERVAL 为常量默认金辉进去
 - if (UPDATE_HOLD_INTERVAL > 0) {
 - // hold a moment waiting for update finish
 - synchronized (serviceObj) {
 - try {
 - // 最大等待时间 5s, 在更新 serviceObj 之后, 就会执行 notifyAll()
 - // 方法入口 updateService(String serviceName, String clusters)
 - // 最大延迟 2s DEFAULT_DELAY = 1
 - serviceObj.wait(UPDATE_HOLD_INTERVAL);
 - } catch (InterruptedException e) {
 - NAMING_LOGGER
 - .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
 - }
 - }
 - }
 - }
 - // 通过 Schedule 更新 服务信息
 - scheduleUpdateIfAbsent(serviceName, clusters);
 - // 获取最新的值
 - return serviceInfoMap.get(serviceObj.getKey());
 - }
 
代码看到这里我们不难理解,为什么第一次 Ribbon 调用的时候都会比较慢,因为它回去初始化服务列表,然后通过 Nacos Client 去 Nacos 查询服务实例信息。
服务端处理
服务端通过 /instance/list 接口来处理服务实例信息查询请求。首先服务实例信息都是被存储在 ConcurrentHashMap 中
- /**
 - * Map(namespace, Map(group::serviceName, Service)).
 - */
 - private final Map
 > serviceMap = new ConcurrentHashMap<>(); 
在我们查询的过程中主要是通过 ServiceManager 来进行管理, 核心的入口方法在 InstanceController#doSrvIpxt 中
- public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
 - int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
 - ClientInfo clientInfo = new ClientInfo(agent);
 - ObjectNode result = JacksonUtils.createEmptyJsonNode();
 - Service service = serviceManager.getService(namespaceId, serviceName);
 - long cacheMillis = switchDomain.getDefaultCacheMillis();
 - // now try to enable the push
 - try {
 - if (udpPort > 0 && pushService.canEnablePush(agent)) {
 - pushService
 - .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
 - pushDataSource, tid, app);
 - cacheMillis = switchDomain.getPushCacheMillis(serviceName);
 - }
 - } catch (Exception e) {
 - Loggers.SRV_LOG
 - .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
 - cacheMillis = switchDomain.getDefaultCacheMillis();
 - }
 - if (service == null) {
 - if (Loggers.SRV_LOG.isDebugEnabled()) {
 - Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
 - }
 - result.put("name", serviceName);
 - result.put("clusters", clusters);
 - result.put("cacheMillis", cacheMillis);
 - result.replace("hosts", JacksonUtils.createEmptyArrayNode());
 - return result;
 - }
 - checkIfDisabled(service);
 - List
 srvedIPs; - // 查询所有的服务
 - // 内部会更新服务列表
 - // allInstances.addAll(persistentInstances);
 - // allInstances.addAll(ephemeralInstances);
 - srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
 - // filter ips using selector:
 - if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
 - srvedIPs = service.getSelector().select(clientIP, srvedIPs);
 - }
 - if (CollectionUtils.isEmpty(srvedIPs)) {
 - if (Loggers.SRV_LOG.isDebugEnabled()) {
 - Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
 - }
 - if (clientInfo.type == ClientInfo.ClientType.JAVA
 - && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
 - result.put("dom", serviceName);
 - } else {
 - result.put("dom", NamingUtils.getServiceName(serviceName));
 - }
 - result.put("name", serviceName);
 - result.put("cacheMillis", cacheMillis);
 - result.put("lastRefTime", System.currentTimeMillis());
 - result.put("checksum", service.getChecksum());
 - result.put("useSpecifiedURL", false);
 - result.put("clusters", clusters);
 - result.put("env", env);
 - result.set("hosts", JacksonUtils.createEmptyArrayNode());
 - result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
 - return result;
 - }
 - Map
 > ipMap = new HashMap<>(2); - ipMap.put(Boolean.TRUE, new ArrayList<>());
 - ipMap.put(Boolean.FALSE, new ArrayList<>());
 - for (Instance ip : srvedIPs) {
 - ipMap.get(ip.isHealthy()).add(ip);
 - }
 - if (isCheck) {
 - result.put("reachProtectThreshold", false);
 - }
 - double threshold = service.getProtectThreshold();
 - if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
 - Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
 - if (isCheck) {
 - result.put("reachProtectThreshold", true);
 - }
 - ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
 - ipMap.get(Boolean.FALSE).clear();
 - }
 - if (isCheck) {
 - result.put("protectThreshold", service.getProtectThreshold());
 - result.put("reachLocalSiteCallThreshold", false);
 - return JacksonUtils.createEmptyJsonNode();
 - }
 - ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
 - for (Map.Entry
 > entry : ipMap.entrySet()) { - List
 ips = entry.getValue(); - if (healthyOnly && !entry.getKey()) {
 - continue;
 - }
 - for (Instance instance : ips) {
 - // remove disabled instance:
 - if (!instance.isEnabled()) {
 - continue;
 - }
 - ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
 - ipObj.put("ip", instance.getIp());
 - ipObj.put("port", instance.getPort());
 - // deprecated since nacos 1.0.0:
 - ipObj.put("valid", entry.getKey());
 - ipObj.put("healthy", entry.getKey());
 - ipObj.put("marked", instance.isMarked());
 - ipObj.put("instanceId", instance.getInstanceId());
 - ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
 - ipObj.put("enabled", instance.isEnabled());
 - ipObj.put("weight", instance.getWeight());
 - ipObj.put("clusterName", instance.getClusterName());
 - if (clientInfo.type == ClientInfo.ClientType.JAVA
 - && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
 - ipObj.put("serviceName", instance.getServiceName());
 - } else {
 - ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
 - }
 - ipObj.put("ephemeral", instance.isEphemeral());
 - hosts.add(ipObj);
 - }
 - }
 - result.replace("hosts", hosts);
 - if (clientInfo.type == ClientInfo.ClientType.JAVA
 - && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
 - result.put("dom", serviceName);
 - } else {
 - &n
 
本文标题:通过 Ribbon 查询 Nacos 服务实例
网站网址:http://zsjierui.cn/article/dpjpeph.html