Dubbo的服务发布逻辑是比较复杂的,我还是以Dubbo自带的示例讲解,这样更方便和容易理解。
Provider配置如下:
1 | <?xml version="1.0" encoding="UTF-8"?> |
ApplicationContext
ClassPathXmlApplicationContext父类AbstractApplicationContext的方法refresh()在实例化bean之后的最后一步finishRefresh()中,此方法作用是发布相应的事件。
1 | protected void finishRefresh() { |
可以看到发布了一个ContextRefreshedEvent事件。
1 | protected void publishEvent(Object event, ResolvableType eventType) { |
首先获取ApplicationEvent事件广播对象,然后广播事件。
ApplicationEvent事件广播对象默认是SimpleApplicationEventMulticaster,这个对象是在AbstractApplicationContext的方法initApplicationEventMulticaster()初始化的,如果需要自定义,可以实现接口ApplicationEventMulticaster,并将bean的名字命名为applicationEventMulticaster。
接下来看看SimpleApplicationEventMulticaster类的multicastEvent方法。
1 |
|
可以看到此方法会调用applicationListener的方法,对于Dubbo而言,就是ServiceBean.
怎么样获取到ServiceBean的呢?
ServiceBean实现了好几个接口,其中有两个接口ApplicationContextAware和ApplicationListener
,其中ApplicationContextAware使ServiceBean具有获取ApplicationContext的能力(了解bean的生命周期),而ApplicationListener使ServiceBean具有响应事件响应的能力。dubbo实现ApplicationContextAware的目的是通过反射把自己添加到ApplicationContext的ApplicationListener列表中,即使不实现ApplicationContextAware接口,spring也会将实现了ApplicationListener接口的bean添加到其listener列表中的,dubbo这样做估计是向后兼容。
接着看invokeListener(listener, event);方法
1 | protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) { |
invokeListener方法内部调用了doInvokeListener方法,而doInvokeListener方法调用了listener(ServiceBean)的onApplicationEvent方法.
ServiceBean
1 | public void onApplicationEvent(ContextRefreshedEvent event) { |
onApplicationEvent方法调用了export方法,export方法首先判断是否已经发布了服务,发布了则直接返回,没有发布则会判断是否需要延迟发布,如果需要延迟,则将发布服务做为一个任务添加到ScheduledThreadPoolExecutor线程池中,如果不延迟,则调用doExport方法立即发布服务。
doExport方法中会获取application/registries/monitor/module/protocols,并做一些检查和属性填充,然后调用doExportUrls();发布服务。doExportUrls()首先调用loadRegistries方法得到要注册的url,然后发布相关Protocol的服务。
简单叙述一下获取url的过程,url通过map组装参数和对应的值,参数有ApplicationConfig和RegistryConfig对象的属性以及path、dubbo、timestamp、pid、protocol、registry。
本示例applicationConfig是:
<dubbo:application name=”demo-provider” qosPort=”22222” id=”demo-provider” />
registryURL
registryConfig是:
<dubbo:registry address=”multicast://224.5.6.7:1234” id=”org.apache.dubbo.config.RegistryConfig” />
最终map组装结果是:
最后得到registryURL是:
registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=4892&qos.port=22222®istry=multicast×tamp=1536112339884
然后调用doExportUrlsFor1Protocol方法发布服务,此方法开始部分是构造发布的服务URL,然后再发布url。
服务URL
URL包括以下几部分:服务端还是客户端标识,Dubbo版本,时间戳,Pid,服务的方法名,token、ApplicationConfig,MoudleConfig,ProviderConfig,ProtocolConfig,*MethodConfig对象的相关属性等。
例如本示例的url:
dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8004&qos.port=22222&side=provider×tamp=1536114090787
我们来着重看一下在构造URL过程中port的获取过程。
1 | //protocolConfig是配置的<dubbo:protocol />生成的对象 |
findConfigedPorts顾名思义是查找配置的port,从哪查呢,先从系统环境变量查,如果没找到,再查找名字为name的protocol协义。
1 | private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map<String, String> map) { |
有人或许有疑问,ServiceConfig在实例化时,不是已经加载过Protocol了吗?为什么还要使用ExtensionLoader加载一次呢?
1
2 >final int defaultPort =ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
>
答: ServiceConfig实例化时,加载的Protocol是自适应的Protocol,是动态生成的,类名是Protocol$Adaptive(见Dubbo源码分析-SPI的应用中有分析)。而这里获取Port时加载的也是Protocol类,但指名了具体加载的是哪个Protocol(本示例是名称为dubbo的Protocol,即DubboProtocol,此类默认的端口是20880)。
发布URL
发布本地服务
调用ServiceConfig类的exportLocal(URL url)发布本地服务。
1 | private void exportLocal(URL url) { |
本示例的本地服务 url是:
injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3008&qos.port=22222&side=provider×tamp=1536125473655
重点看这一句:
1 | Exporter<?> exporter = protocol.export( |
其中涉及到ProxyFactory和Protocol,下面分别来看一看。
ProxyFactory
proxyFactory也是通过SPI加载的自适应类对象,类名为ProxyFactory$Adaptive,我们来看一下其class文件反编译后的源码。
1 | package org.apache.dubbo.rpc; |
其中有三个方法,两个获取代理,一个获取Invoker。我们来看其中的getInvoker方法,默认获取名称为javassist的ProxyFactory。
由于本地服务URL中没有proxy参数,所以会调用JavassistProxyFactory的getInvoker(T proxy, Class
其实(ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(“javassist”);获取到的并不是JavassistProxyFactory对象,而是StubProxyFactoryWrapper对象,为什么呢?我们可以看下ExtensionLoader的getExtension(String name)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 > public T getExtension(String name) {
> //检查name是否合法
> if (name == null || name.length() == 0)
> throw new IllegalArgumentException("Extension name == null");
> //如果name等于true,则加载SPI的默认插件
> if ("true".equals(name)) {
> return getDefaultExtension();
> }
> //从当前插件类的缓存实例对象中获取
> Holder<Object> holder = cachedInstances.get(name);
> if (holder == null) {
> cachedInstances.putIfAbsent(name, new Holder<Object>());
> holder = cachedInstances.get(name);
> }
> Object instance = holder.get();
> if (instance == null) {
> synchronized (holder) {
> instance = holder.get();
> if (instance == null) {
> //创建插件实例
> instance = createExtension(name);
> holder.set(instance);
> }
> }
> }
> return (T) instance;
> }
>
> private T createExtension(String name) {
> //从文件目录中加载插件类
> Class<?> clazz = getExtensionClasses().get(name);
> if (clazz == null) {
> throw findException(name);
> }
>
> //从已加载的所有插件实例集合中获取
> try {
> T instance = (T) EXTENSION_INSTANCES.get(clazz);
> if (instance == null) {
> //实例化插件实例,并放入集合
> EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
> instance = (T) EXTENSION_INSTANCES.get(clazz);
> }
>
> //注入属性
> injectExtension(instance);
>
> //插件的包裹类
> Set<Class<?>> wrapperClasses = cachedWrapperClasses;
> if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
> for (Class<?> wrapperClass : wrapperClasses) {
> instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
> }
> }
> return instance;
> } catch (Throwable t) {
> throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
> type + ") could not be instantiated: " + t.getMessage(), t);
> }
> }
>
重点的地方就在于插件的包裹类,StubProxyFactoryWrapper就是JavassistProxyFactory的包裹类,为什么这么说呢,因为StubProxyFactoryWrapper有一个带ProxyFactory参数的构造函数而且实现了ProxyFactory接口,具体可以看Extension的loadExtensionClasses方法源码(装饰者模式)。
Protocol
protocol对象也是一个自适应插件类,类名为Protocol$Adaptive,在上一篇文章中已有讲解。这个类会根据url的协义取得对应转义的插件类,没有的话,默认为dubbo协义,本地服务url协义为injvm,所以会加载InjvmProtocol,但是在加载InjvmProtocol并实例化后,发现InjvmProtocol还有对应的包裹类即(其实是所有Protocol的包裹类):ProtocolFilterWrapper和ProtocolListenerWrapper。ProtocolFilterWrapper类的作用是添加一些过滤器,ProtocolListenerWrapper的作用是添加ExporterListener。InjvmProtocol的export方法仅仅创建一个InjvmExporter实例,没有开启服务。
发布远程服务
如果注册url不为空,调用proxyFactory得到服务对象的代理类,然后使用protocol发布服务。由于注册url的协义是registry,在使用ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(“registry”);会加载RegistryProtocol类并实例化,而且会添加其包裹类:ProtocolFilterWrapper和ProtocolListenerWrapper。而在这两个包裹类的export方法的首行,都会对registry协义进行单独处理。
RegistryProtocol
1 | if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { |
经过这两个包裹类后,最终会调用RegistryProtocol的export方法。
1 | public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { |
最重要的是这一句:
1 | exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); |
其中protocol也是Protocol$Adaptive对象,而invokerDelegete的URL是服务的url.
本示例中为:
dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8468&qos.port=22222&side=provider×tamp=1536138127517
DubboProtocol
Protocol$Adaptive在解析URL的时得到dubbo,所以会加载DubboProtocol并实例化(DubboProtocol实际在前面获取默认接口时已经实例化并缓存起来了,此处取的是缓存的实例),并调用了DubboProtocol的export方法(与上面一样,在得到DubboProtocol实例后,仍然会在外面包裹一下)。
1 | public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { |
经过层层探索,曲折迂回,终于到openServer了,进去看看。
1 | private void openServer(URL url) { |
可以看到其中有一个重要方法createServer(url)。
1 | private ExchangeServer createServer(URL url) { |
Exchangers
进入Exchangers.bind方法一探究竟。
1 | public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { |
getExchanger(url)默认得到的是HeaderExchanger,可通过exchanger参数配置。
到HeaderExchanger中看看bind方法
1 | public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { |
Transporter
看下Transporters的bind方法。
1 | public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { |
通过getTransporter方法获取一个自适应的Transporter,类名为Transporter$Adaptive,我们来看一下其源码:
1 | package org.apache.dubbo.remoting; |
可以看到Transporter$Adaptive通过判断URL中是否有transporter参数,如果没有,就默认为netty。
示例中服务的URL为
dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8004&qos.port=22222&side=provider×tamp=1536114090787
其中没有transporter参数,所以就使用netty。然后dubbo就去查找netty对应的是哪个Transporter,结果找到是NettyTransporter。
1 | package org.apache.dubbo.remoting.transport.netty4; |
NettyTransporter很简单,只有两个方法,一个用于开启服务,一个用于连接服务。到这里已经明白了Dubbo是如何发布一个服务的。
我们再进一步看下NettyServer的构造函数
1 | public NettyServer(URL url, ChannelHandler handler) throws RemotingException { |
可以看出其调用父类的构造函数,并传入url和handler的包裹类。handler的包裹类有哪些呢,进去看一看。
1 | public static ChannelHandler wrap(ChannelHandler handler, URL url) { |
注意到有一个接口Dispatcher,其自适应插件类是AllDispatcher,AllDispatcher的dispatch方法返回AllChannelHandler实例(此实例会将所有请求做为任务放入线程池中处理),在此实例基础上又包裹了HeartbeatHandler和MultiMessageHandler。
NettyServer会将MultiMessageHandler层层往上传到其父类AbstractPeer。
我们来回忆一下正向流程:
从ServiceConfig发布registryURL开始(见doExportUrlsFor1Protocol方法)
1.ServiceConfig生成服务实例的代理工厂类JavassistProxyFactory(ProxyFactory SPI默认代理工厂类)并包裹到DelegateProviderMetaDataInvoker(此类记录代理工厂类和服务信息ServiceBean(<dubbo:service />标签对应的类))
2.由于registryURL的protocol协义是registry,所以会加载RegistryProtocol(Protocol类的外面都包裹了ProtocolFilterWrapper和ProtocolListenerWrapper,下面不再特殊说明),并传入上一步的invoker。
3.RegistryProtocol又找到DubboProtocol,也会带上Invoker(此时的Invoker包含上一次的Invoker并带有服务地址(dubbo://IP:端口/服务接口全称?参数=xxx))。
所以requestHandler又会调用正向传过来的Invoker,经过ProtocolFilterWrapper和ProtocolListenerWrapper,最终调用到服务实现类相应的方法。
最后以一张图总结:
标识为SPI的类,是可以动态加载的。图片看不清楚的话,请查看原图。
再简单说下接收到请求后的处理流程:NettyServer接收到请求后,交给NettyServerHandler处理,NettyServerHandler转交给NettyServer的父类AbstractPeer处理,AbstractPeer又交给MultiMessageHandler处理,这样就开始了handler链的处理,handler的终点是HeaderExchangerHandler,HeaderExchangerHandler调用DubboProtocol传过来的成员变量requestHandler调用相应的服务类方法,然后得到结果,调用NettyServerHandler传过来的NettyChannel发送结果到Client。
v1.5.2