- A+
所属分类:未分类
开篇
这篇文章主要的目的是想分析下dubbo优雅停机的过程,整个文章参考网上很多现成的文章,本着尊重原创的精神会在文章中备注参考信息。
针对阅读dubbo源码,我的感觉是当你一开始钻到细节当中就很容易一叶障目了,所以建议一开始着重梳理整个框架的逻辑而不要陷入细节当中。
优雅停机的原理
说明:
- dubbo的优雅停机是建立在JVM的addShutdownHook回调的机制上的,通过注册回调调用停机的逻辑ProtocolConfig.destroyAll()
- ProtocolConfig.destroyAll()执行逻辑是:1、关闭注册中心;2、关闭发布协议服务。
- 关闭注册中心:AbstractRegistryFactory.destroyAll()。
- 关闭发布的协议服务:protocol.destroy()。
public abstract class AbstractConfig implements Serializable { static { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { if (logger.isInfoEnabled()) { logger.info("Run shutdown hook now."); } ProtocolConfig.destroyAll(); } }, "DubboShutdownHook")); } } public class ProtocolConfig extends AbstractConfig { public static void destroyAll() { if (!destroyed.compareAndSet(false, true)) { return; } // 关闭注册中心 AbstractRegistryFactory.destroyAll(); // 关闭所有已发布的协议如dubbo服务 ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class); for (String protocolName : loader.getLoadedExtensions()) { try { Protocol protocol = loader.getLoadedExtension(protocolName); if (protocol != null) { protocol.destroy(); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } }
说明:
- 图片来自 Dubbo优雅停机
- B服务作为Provider需要进行优雅停机。
- B服务首先断开和注册中心的连接。
- B服务关闭提供服务的Server端的监听,保证不接受请求。
- B服务关闭引用的C和D服务,保证不再调用下游服务。
优雅停机过程-注册中心关闭
说明:
- 注册中心关闭通过LOCK来保证不重入,此例中以ZookeeperRegistry为例。
- ZookeeperRegistry的关闭顺序:1、关闭注册中心;2、断开和zookeeper的连接。
- 关闭注册中心按照调用链路走到FailbackRegistry,关闭注册中心并停掉重试操作。
- 关闭注册中心按照调用链路走到AbstractRegistry,按照先移除作为provider的URL,再移除作为consumer的订阅的consumer信息。
- 具体的信息看下面的源码,已经按照继承关系组织好了。
public abstract class AbstractRegistryFactory implements RegistryFactory { public static void destroyAll() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } // Lock up the registry shutdown process LOCK.lock(); try { for (Registry registry : getRegistries()) { try { registry.destroy(); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } REGISTRIES.clear(); } finally { // Release the lock LOCK.unlock(); } } } public class ZookeeperRegistry extends FailbackRegistry { public void destroy() { // 调用父类FailbackRegistry关闭注册中心 super.destroy(); try { // 关闭zkClient客户端保证临时provider节点下线 zkClient.close(); } catch (Exception e) { logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e); } } } public abstract class FailbackRegistry extends AbstractRegistry { public void destroy() { if (!canDestroy()){ return; } super.destroy(); try { // 首先要明白FailbackRegistry的核心就在于失败重试,所以这一层的关闭只要关闭retryFuture就可以 retryFuture.cancel(true); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } public abstract class AbstractRegistry implements Registry { public void destroy() { if (!destroyed.compareAndSet(false, true)) { return; } if (logger.isInfoEnabled()) { logger.info("Destroy registry:" + getUrl()); } // 作为provider,取消所有的服务注册 Set<URL> destroyRegistered = new HashSet<URL>(getRegistered()); if (!destroyRegistered.isEmpty()) { for (URL url : new HashSet<URL>(getRegistered())) { if (url.getParameter(Constants.DYNAMIC_KEY, true)) { try { // 从已注册的列表中移除该URL unregister(url); if (logger.isInfoEnabled()) { logger.info("Destroy unregister url " + url); } } catch (Throwable t) { logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } // 作为consumer,取消所有的订阅关系 Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed()); if (!destroySubscribed.isEmpty()) { for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { try { unsubscribe(url, listener); if (logger.isInfoEnabled()) { logger.info("Destroy unsubscribe url " + url); } } catch (Throwable t) { logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t); } } } } } public void unregister(URL url) { if (url == null) { throw new IllegalArgumentException("unregister url == null"); } if (logger.isInfoEnabled()) { logger.info("Unregister: " + url); } registered.remove(url); } public void unsubscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("unsubscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("unsubscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Unsubscribe: " + url); } Set<NotifyListener> listeners = subscribed.get(url); if (listeners != null) { listeners.remove(listener); } } }
优雅停机过程-协议关闭
说明:
- 协议关闭按照以下顺序进行:1、关闭provider端的监听;2、关闭作为consumer的reference的服务;3、调用父类针对exporter对象进行清理。
- 关闭provider端的监听:关闭provider端的监听(server.close)。
- 关闭consumer的服务:关闭dubbo服务引用的服务(client.close)。
- 调用父类清理exporter:清理exporter服务(super.destroy)。
public class DubboProtocol extends AbstractProtocol { public void destroy() { // 关停所有的Server,作为provider将不再接收新的请求 for (String key : new ArrayList<String>(serverMap.keySet())) { ExchangeServer server = serverMap.remove(key); if (server != null) { try { if (logger.isInfoEnabled()) { logger.info("Close dubbo server: " + server.getLocalAddress()); } server.close(getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } // 关停所有的Client,作为consumer将不再发送新的请求 for (String key : new ArrayList<String>(referenceClientMap.keySet())) { ExchangeClient client = referenceClientMap.remove(key); if (client != null) { try { if (logger.isInfoEnabled()) { logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); } client.close(getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } // 幽灵客户端的处理逻辑,不清楚幽灵客户端是啥? for (String key : new ArrayList<String>(ghostClientMap.keySet())) { ExchangeClient client = ghostClientMap.remove(key); if (client != null) { try { if (logger.isInfoEnabled()) { logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); } client.close(getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } stubServiceMethodsMap.clear(); // 调用父类继续进行清理,针对exporter对象进行清理 super.destroy(); } }
provider监听的close过程
说明:
- provider监听的close过程:关闭心跳检测操作,关闭底层netty服务的监听channel管道。
- 关闭心跳检测操作:doClose()。
- 关闭底层netty监听:server.close(timeout)。
public class HeaderExchangeServer implements ExchangeServer { public void close(final int timeout) { startClose(); if (timeout > 0) { final long max = (long) timeout; final long start = System.currentTimeMillis(); if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) { sendChannelReadOnlyEvent(); } // 如果还有进行中的任务并且没有到达等待时间的上限,则继续等待 while (HeaderExchangeServer.this.isRunning() && System.currentTimeMillis() - start < max) { try { // 休息10毫秒再检查 Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } // 关闭心跳,停止应答 doClose(); // 关闭通信通道 server.close(timeout); } private void doClose() { // 修改标记位,该标记为设置为true后,provider不再对上游请求做应答 if (!closed.compareAndSet(false, true)) { return; } // 取消心跳的Futrue stopHeartbeatTimer(); try { // 关闭心跳的线程池 scheduled.shutdown(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } public abstract class AbstractServer extends AbstractEndpoint implements Server { public void close() { if (logger.isInfoEnabled()) { logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } ExecutorUtil.shutdownNow(executor, 100); try { // 设置关闭的标记位 super.close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { // 执行真正的关闭动作 doClose(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } protected abstract void doClose() throws Throwable; } public class NettyServer extends AbstractServer implements Server { protected void doClose() throws Throwable { try { if (channel != null) { // unbind. channel.close(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels(); if (channels != null && channels.size() > 0) { for (com.alibaba.dubbo.remoting.Channel channel : channels) { try { channel.close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { if (bootstrap != null) { // release external resource. bootstrap.releaseExternalResources(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { if (channels != null) { channels.clear(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } } }
client的清理过程
说明:
- client的关闭过程本质上是关闭引用服务的channel对象。
- client的关闭顺序按照:设置关闭标记位,关闭心跳检测,关闭通道。
public class HeaderExchangeClient implements ExchangeClient { public void close(int timeout) { startClose(); doClose(); channel.close(timeout); } public void startClose() { channel.startClose(); } private void doClose() { stopHeartbeatTimer(); } }
exporter清理过程
说明:
- exporter的清理主要包括invoker和exporter两个对象的清理。
- invoker和exporter两个对象的具体作用暂时还未理清楚,待定。
- exporter的清理最终还是走到了invoker的清理过程当中。
public abstract class AbstractProtocol implements Protocol { public void destroy() { for (Invoker<?> invoker : invokers) { if (invoker != null) { // 移除invokers invokers.remove(invoker); try { if (logger.isInfoEnabled()) { logger.info("Destroy reference: " + invoker.getUrl()); } // 销毁invokers invoker.destroy(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } for (String key : new ArrayList<String>(exporterMap.keySet())) { // 移除exporter Exporter<?> exporter = exporterMap.remove(key); if (exporter != null) { try { if (logger.isInfoEnabled()) { logger.info("Unexport service: " + exporter.getInvoker().getUrl()); } // 销毁exporter exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } } } public class DubboInvoker<T> extends AbstractInvoker<T> { public void destroy() { if (super.isDestroyed()) { return; } else { destroyLock.lock(); try { if (super.isDestroyed()) { return; } super.destroy(); if (invokers != null) { invokers.remove(this); } for (ExchangeClient client : clients) { try { client.close(getShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } finally { destroyLock.unlock(); } } } } public abstract class AbstractExporter<T> implements Exporter<T> { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final Invoker<T> invoker; private volatile boolean unexported = false; public AbstractExporter(Invoker<T> invoker) { if (invoker == null) throw new IllegalStateException("service invoker == null"); if (invoker.getInterface() == null) throw new IllegalStateException("service type == null"); if (invoker.getUrl() == null) throw new IllegalStateException("service url == null"); this.invoker = invoker; } public Invoker<T> getInvoker() { return invoker; } public void unexport() { if (unexported) { return; } unexported = true; getInvoker().destroy(); } public String toString() { return getInvoker().toString(); } }
- 安卓客户端下载
- 微信扫一扫
- 微信公众号
- 微信公众号扫一扫