作者:微信小助手
发布时间:2021-04-17T17:59
说起注册中心,我们首先要知道注册中心是用来做什么的,注册中心一般都是用在微服务架构中,而微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信通常采用HTTP的方式,这些服务共用一个最小型的集中式的管理。这个最小型的集中式管理的组件就是服务注册中心。
本文的目的在于详解 nacos 注册中心的服务注册流程,所以首先需要对 nacos 有个基本的了解。nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
nacos 服务注册中心,它是服务,其实例及元数据的数据库。服务实例在启动时注册到服务注册表,并在关闭时注销。服务和路由器的客户端查询服务注册表以查找服务的可用实例。服务注册中心可能会调用服务实例的健康检查 API 来验证它是否能够处理请求。
了解了 nacos 的基本架构和服务注册中心的功能之后,接下来就要来详解服务注册的流程原理了,首先建立一个 nacos 客户端工程,springboot 版本选择2.1.0,springcloud 版本选择 Greenwich
<spring-boot-version>2.1.0.RELEASE</spring-boot-version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
复制代码
随后引入 springcloud,springboot 和 springcloud-alibaba 的依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-boot-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
创建服务提供者模块,在服务提供者模块的 pom 文件中引入 spring-cloud-starter-alibaba-nacos-discovery 依赖,在启动类中加上 @EnableDiscoveryClient 注解,使得注册中心可以发现该服务,运行启动类,可以看到 nacos 控制台中注册了该服务
[]( imgchr.com/i/rloNjg )
点击详情,可以看到服务详情,其中临时实例属性为true,代表服务是临时实例,nacos 注册中心可以设置注册的实例是临时实例还是持久化实例,默认服务都是临时实例。
[]( imgchr.com/i/rlTdxO )
搭建好demo并且在 nacos 控制台上看到效果之后,接下来就要来分析服务注册的原理了,要先知道原理,最好的办法就是分析源码,首先要知道 nacos 客户端是什么时候向服务端去注册的,其次需要知道服务端是怎么执行服务注册的,对于客户端来说,由于引入了 spring-cloud-starter-alibaba-nacos-discovery 依赖,自然源码要到这里去找。PS:源码分析部分内容较多,望大家理解。
根据 springboot 自动配置原理,很容易可以想到, 要到 spring.factories 文件下去找相关的 AutoConfiguration 配置类,果不其然,我们找到了 NacosDiscoveryAutoConfiguration 这个配置类
从图中我们看到这个配置类中有三个方法上有 @Bean 注解,熟悉 Spring 的小伙伴们都知道,@Bean 注解表示产生一个 Bean 对象,然后这个 Bean 对象交给 Spring 管理,同时我们又看到最后一个 nacosAutoServiceRegistration 这个方法中的参数里有上面两个方法产生的对象,说明上面两个方法的实现不是很重要,直接来看 nacosAutoServiceRegistration 的实现。
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
//父类先初始化,先看看父类的实现
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent>
我们看到父类实现了 ApplicationListener 接口,而实现该接口必须重新其 onApplicationEvent() 方法,我们看到方法中又调用了 bind() 方法
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(
((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
//CAS 原子操作
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
// 开始注册
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 把 serviceId 和 instance传入,开始注册流程
namingService.registerInstance(serviceId, instance);
log.info("nacos registry, {} {}:{} register finished", serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
//如果实例是临时的,组装beatInfo,发送心跳请求,因为默认是临时实例,所以肯定走到这段代码
//为什么要发送心跳请求,因为nacos在此时实现的是cap理论中的ap模式,即不保证强一致性,但会保证可用性,这就需要做到动态感知服务的上下线,所以要通过心跳来判断服务是否还正常在线
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
//添加心跳信息进行处理
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
//服务代理类注册实例
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
心跳请求部分
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
//定时任务发送心跳请求
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
public void run() {
if (beatInfo.isStopped()) {
return;
}
//服务代理类发送心跳,心跳时长为5秒钟,也就是每5秒发送一次心跳请求
long result = serverProxy.sendBeat(beatInfo);
long nextTime = result > 0 ? result : beatInfo.getPeriod();
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
public long sendBeat(BeatInfo beatInfo) {
try {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(4);
params.put("beat", JSON.toJSONString(beatInfo));
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
//通过http的put请求,向服务端发送心跳请求,请求路径为/v1/ns/instance/beat
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) {
return jsonObject.getLong("clientBeatInterval");
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), e);
}
return 0L;
}
看到这里我们了解到,客户端是通过 http 请求的方式,和服务端进行通信,由于还涉及注册实例的源码和服务端的源码,心跳部分暂时告一段落,接下来回到服务代理类注册实例
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
//组装各种参数
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
//发送注册实例请求,路径为/v1/ns/instance
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
到这里,nacos 客户端部分的源码就分析完了,接下来开始分析 nacos 服务端的源码
首先去到 nacos 的官方 github,clone 一份源码到本地,选择1.1.4版本,通过 maven 编译后打开,源码结构如下
之前我们在分析客户端源码的时候看到客户端向服务端发送请求的部分都在 NacosNamingService 类中,相对应的在服务端应该也是到naming 的模块下寻找代码的入口,根据 springboot 开发接口的习惯,接口都是通过 controller 来实现调用的,所以直接找到controllers目录,其中在 InstanceController 中,我们看到了 register() 的方法,也看到了 beat() 方法,接上面的 客户端心跳发送部分,先分析 beat() 方法。
public JSONObject beat(HttpServletRequest request) throws Exception {
JSONObject result = new JSONObject();
result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String beat = WebUtils.required(request, "beat");
RsInfo clientBeat = JSON.parseObject(beat, RsInfo.class);
if (!switchDomain.isDefaultInstanceEphemeral() && !clientBeat.isEphemeral()) {
return result;
}
if (StringUtils.isBlank(clientBeat.getCluster())) {
clientBeat.setCluster(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
String clusterName = clientBeat.getCluster();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
}
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(),
clientBeat.getIp(),
clientBeat.getPort());
if (instance == null) {
//如果实例为空,组装实例
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(