Protocol
通过Provider这篇文章,我们了解到,Dubbo服务暴露通过ServiceConfig.export()方法进行暴露,真正暴露到外部,而暴露到远程服务时有两种方式:
- 暴露到服务注册中心
- 直连方式暴露
暴露到服务注册中心时由RegistryProtocol.export()与服务注册中心进行交互,而直连方式暴露时由业务协议.export()进行服务暴露。
其实暴露到服务注册中心的处理逻辑包含直连方式的处理逻辑,因此,为了较为清晰的拆解这个问题,笔者先介绍直连方式暴露服务的逻辑,再介绍暴露到服务注册中心的逻辑。
1. 直连方式暴露
通过直连方式暴露服务运行的是ServiceConfig.doExportUrlsFor1Protocol(...)中的这部分逻辑:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoke wrapperInvoker = newDelegateProviderMetaDataInvoke(invoker, this);
Exporter<?> exporter =protocol.export(wrapperInvoker;
exporters.add(exporter);
其中ProxyFactory生成Invoker的相关逻辑请参考2-1.ProxyFactory。这里主要介绍
Exporter<?> exporter =protocol.export(wrapperInvoker;
经过之前的分析,我们知道这里的protocol是一个自适应代理,具体调用的Protocol实现取决于wrapperInvoker.getUrl()获取到的URL的具体协议,按照笔者提供的例子,这里的URL应为:
dubbo://192.168.12.64:20880/com.books.dubbo.demo.api.GreetingService?anyhost=true&application=first-dubbo-provider&bind.ip=192.168.12.64&bind.port=20880&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&group=dubbo&interface=com.books.dubbo.demo.api.GreetingService&methods=sayHello,testGeneric&pid=4385®ister=true&release=2.7.1&revision=1.0.0&side=provider×tamp=1610713667487&version=1.0.0
即服务URL。可以看到,协议名为dubbo,参考SPI配置文件org.apache.dubbo.rpc.Protocol:
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
自适应代理将调用逻辑最后交给DubboProtocol处理,即这里调用的是DubboProtocol.export(Invoker<T> invoker)方法。
注意,DubboProtocol的功能是将Service暴露到外部,以供调用端访问,我们可以称之为是将接口导出到外部,因此DubboProtocol.export(Invoker<T> invoker)的结果是一个Exporter对象。查看Exporter接口代码:
public interface Exporter<T> {
Invoker<T> getInvoker();
void unexport();
}
可以看到, Exporter暴露了两个方法:
- getInvoker():之前已经提到,invoker是Dubbo暴露出去的服务的一个代理,真正实现业务操作都需要通过invoker进行代理(路由)。
- unexport():服务要隐藏时需要使用该方法。
我们都知道Dubbo底层默认使用的是Netty,那么Dubbo的Exporter到底如何与Netty进行整合呢?这里考察org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 获取服务唯一标识
String key = serviceKey(url);
// 封装Exporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 处理Stub
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 创建Netty - server
openServer(url);
// 整合序列化优化器
optimizeSerialization(url);
return exporter;
}
该方法主要处理了5件事:
- 获取服务唯一标识
- 将服务封装成Exporter,并放入exporterMap中
- 处理Stub
- 启动Netty服务器
- 整合序列化优化器
这里,我们主要关注1、2、4,这三件事。原因是:
- 我们需要了解Dubbo对服务划分的粒度以及定义
- 由代码可以看出,Exporter并没有与Netty服务进行直接交互,所以exporterMap就成为了关键。
- Netty服务器启动过程中必然有整合Dubbo业务的配置,所以需要详细讨论
至于剩下两部分:
- Stub是Dubbo的一个小插件,其功能我们在分析完Provider流程后会进行讨论
- 序列化部分是Dubbo的一个大模块,这个模块需要进行详细讨论,这里便不再分析。
1.1 Dubbo 服务划分
对于Dubbo来说,一个服务由四元组组成:<协议端口,服务名,版本号,服务分组>,这唯一确定了一个服务。DubboProtocol.export(xxx)方法中,通过URL获取这四元组中的内容:
URL url = invoker.getUrl();
String key = serviceKey(url);
这里获取到的URL就是描述服务本身的服务URL,serviceKey(String)方法负责从URL中提取上面所说的四元组:
protected static String serviceKey(URL url) {
// 获取URL中指定的协议端口号
int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
// 通过协议端口号+serviceName+服务版本号+服务组别名称 获取serviceKey
return serviceKey(port, url.getPath(), url.getParameter(Constants.VERSION_KEY),
url.getParameter(Constants.GROUP_KEY));
}
// 该方法负责将上述四元组格式化为
// 服务组别/服务名:版本号:端口号
// 的格式
protected static String serviceKey(int port, String serviceName,String serviceVersion, String serviceGroup) {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}
1.2 封装Exporter
对于Protocol这一层,输入是服务URL,输出是Exporter即导出器,DubboProtocol生成的是DubboExporter,类继承结构如下:

Exporter接口已经介绍过了,这里笔者继续介绍AbstractExporter,该类主要提供了Exporter的一些基础属性以及基础的销毁方法,保证了Log的统一,存储了Invoker,最后保证了销毁时必须对invoker进行销毁的逻辑。
如:
- Log相关,保证了Log的统一
protected final Logger logger = LoggerFactory.getLogger(getClass()); - 基础属性,存储了Invoker
private final Invoker<T> invoker; - 销毁相关,销毁时必须对invoker进行销毁的逻辑
private volatile boolean unexported = false; @Override public void unexport() { if (unexported) { return; } unexported = true; getInvoker().destroy(); }
对于DubboExporter来说,笔者已经提到,DubboExporter并没有直接交给Netty来进行暴露,而是存放到了一个名为exporterMap的Map中,那么服务暴露必然和exporterMap有关。然而当服务销毁时自然要维护exporterMap,因此DubboExporter提供了这种支持:
private final Map<String, Exporter<?>> exporterMap;
@Override
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
1.3 开启Netty服务
DubboProtocol中,通过openServer(URL url)方法开启端口监听。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only forserver to invoke
boolean isServer = url.getParameter(ConstantsIS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
防止重复创建Server,Netty这里设置了一个缓存ServerMap,缓存的key是本服务的host+port,value就是具建的Server对象。Dubbo将其称为ExchangeServer。新建hangeServer的方法是createServer(URL url)`。
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 健康监测
.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT))
.addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
.build();
// 代码点1 获取服务器类型,如果不指定默认是dubbo
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 代码点2 通过服务器类型查找Transporter
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 代码点3 通过URL开启服务器端口监听功能
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 代码点4 校验客户端是否支持
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
该方法主要处理了如下4个步骤:
- 通过URL中的
server属性获取服务器类型,默认是netty - 通过服务器类型查找
TransporterDubbo将具体进行端口暴露的服务器称为Transporter,这里默认获取到的是netty对应的org.apache.dubbo.remoting.transport.netty4.NettyTransporter。 - 通过URL开启服务器端口监听功能
- 校验客户端是否支持
这里我们主要考察第3步:
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
可以看到,Dubbo开启端口监听服务是交给Exchangers完成的,入参有两个,其中的url读者应该比较了解了,而requestHandler又是什么呢?
众所周知Netty是基于事件监听机制的服务端框架,这种机制广泛应用于各种中间件,例如redis、zookeeper。Dubbo默认使用的是netty,它将这种模式进行了封装,requestHandler用于处理业务逻辑,里面对应的几个方法分别处理不同的事件。关于Netty具体的细节,笔者之后会在其余的部分进行讲解。这里对requestHandler进行简单讲解。requestHandler的类型是ExchangeHandlerAdapter,该类是一个抽象类,考察其类继承关系:

其中各类的职责如下:
- ChannelHandler:规定网络事件处理所需要的方法,ChannelHandler共规定了5个方法:
- connected:与客户端建立连接
- disconnected:与客户端断开连接
- sent:需要进行数据发送
- received:读取到数据
- caught:抛出异常
- TelnetHandler:处理Telnet访问
- ExchangeHandler:Dubbo将TCP协议传输的数据同样封装了Request和Response,ExchangeHandler处理了对应的逻辑
其余的类,除了TelnetHandlerAdapter,提供的都是一些空方法,而TelnetHandlerAdapter提供了处理telnet访问Dubbo的处理逻辑。
ChannelHandler用于将事件监听机制和具体的业务方法联系在一起。具体如何进行联系的,笔者会在介绍服务调用时具体流程的文章中介绍。
接下来继续介绍Exchangers.bind(url,requestHandler)这个方法:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// 代码点1 指定Exchanger使用的codec参数
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 代码点3 将requestHandler绑定到Exchanger上
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
//代码点2 根据Exchanger类型获取Exchanger
return getExchanger(type);
}
public static Exchanger getExchanger(Stringtype) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
Exchangers.bind(url,requestHandler)方法进行了3步操作:
- 如果URL没有指定codec参数,那么为其添加参数
codec=exchange - 根据URL获取Exchanger,默认使用的是
HeaderExchanger - 将Protocol层提供的RequestHandler绑定到Exchanger上
这里有一个疑问,为何要给URL中添加codec=exchange参数?这个问题我们会在介绍Exchanger中进行解答。
下面我们先看getExchanger(url).bind(url, handler)这行代码。经过上面的分析我们已经了解到,getExchanger(url)默认使用的是HeaderExchanger,考察该类的bind(URL url, ExchangeHandler handler)方法:
@Override
public ExchangeServer bind(URL url,ExchangeHandler handler) throwsRemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchanger使用装饰器模式,为handler添加了两层装饰器DecodeHandler和HeaderExchangeHandler,并将Server使用HeaderExchangeServer进行装饰。至于这三个类的具体作用,将会在介绍Exchanger时进行详细介绍。
2. 直连方式处理请求
在上面,笔者提到了DubboProtocol中的requestHandler属性,该属性是ExchangeHandlerAdapter类型,其功能就是将Dubbo调用转化为Invocation交给Invoker进行处理。
当Consumer调用Provider时,ExchangeHandler会将请求转到requestHandler.received(Channel channel, Object message)方法,即DubboProtocol.requestHandler.received(Channel channel, Object message)方法,源码如下:
@Override
public void received(Channel channel,Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
如果是Dubbo协议的RPC调用,message是Invocation类型,会调用reply(ExchangeChannel channel, Object message)方法:
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message)throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
// 获取Invocation对象
Invocation inv = (Invocation) message;
// 通过Invocation对象获取执行器
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
// 构建RPCContext
RpcContext rpcContext = RpcContext.getContext();
rpcContext.setRemoteAddress(channel.getRemoteAddress());
// 使用执行器执行具体的业务逻辑
Result result = invoker.invoke(inv);
if (result instanceof AsyncRpcResult) {
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}
reply(ExchangeChannel channel, Object message)在接收到RPC调用后,会根据客户端传入的消息,获取对应的执行器Invoker,然后通过方法名和参数唯一确定一个方法,进行业务逻辑调用。经过上一节的描述我们已经了解到,对于每个Invoker都会被包装为Exporter,然后放入exporterMap中,其中exporterMap的key是通过serviceKey方法获取的。serviceKey通过4个参数获取key:服务组别/服务名:版本号:端口号。
getInvoker(Invocation)就通过相同的方式获取serviceKey,然后取出对应的Exporter,再由Exporter获取Invoker执行具体的业务逻辑。如下XML所示。

总结
本节主要介绍了Dubbo采用直连方式暴露服务时,Protocol的主要流程,以及在Dubbo处理RPC调用时Protocol层的主要操作流程。
Dubbo的Protocol层上层是Invoker,下层是Exchanger,笔者接下来会对Exchanger层进行介绍。