dubbo – 优雅停机

  • dubbo – 优雅停机已关闭评论
  • 135 views
  • A+
所属分类:编程开发

开篇

这篇文章主要的目的是想分析下dubbo优雅停机的过程,整个文章参考网上很多现成的文章,本着尊重原创的精神会在文章中备注参考信息。

针对阅读dubbo源码,我的感觉是当你一开始钻到细节当中就很容易一叶障目了,所以建议一开始着重梳理整个框架的逻辑而不要陷入细节当中。

优雅停机的原理

说明:

  • dubbo的优雅停机是建立在JVM的addShutdownHook回调的机制上的,通过注册回调调用停机的逻辑ProtocolConfig.destroyAll()
  • ProtocolConfig.destroyAll()执行逻辑是:1、关闭注册中心;2、关闭发布协议服务。
  • 关闭注册中心:AbstractRegistryFactory.destroyAll()。
  • 关闭发布的协议服务:protocol.destroy()。
public abstract class AbstractConfig implements Serializable {
    static {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                if (logger.isInfoEnabled()) {
                    logger.info("Run shutdown hook now.");
                }
                ProtocolConfig.destroyAll();
            }
        }, "DubboShutdownHook"));
    }
}


public class ProtocolConfig extends AbstractConfig {

    public static void destroyAll() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        // 关闭注册中心
        AbstractRegistryFactory.destroyAll();

        // 关闭所有已发布的协议如dubbo服务
        ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
        for (String protocolName : loader.getLoadedExtensions()) {
            try {
                Protocol protocol = loader.getLoadedExtension(protocolName);
                if (protocol != null) {
                    protocol.destroy();
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
}

dubbo - 优雅停机
说明:

  • 图片来自 Dubbo优雅停机
  • B服务作为Provider需要进行优雅停机。
  • B服务首先断开和注册中心的连接。
  • B服务关闭提供服务的Server端的监听,保证不接受请求。
  • B服务关闭引用的C和D服务,保证不再调用下游服务。

优雅停机过程-注册中心关闭

说明:

  • 注册中心关闭通过LOCK来保证不重入,此例中以ZookeeperRegistry为例。
  • ZookeeperRegistry的关闭顺序:1、关闭注册中心;2、断开和zookeeper的连接。
  • 关闭注册中心按照调用链路走到FailbackRegistry,关闭注册中心并停掉重试操作。
  • 关闭注册中心按照调用链路走到AbstractRegistry,按照先移除作为provider的URL,再移除作为consumer的订阅的consumer信息。
  • 具体的信息看下面的源码,已经按照继承关系组织好了。
public abstract class AbstractRegistryFactory implements RegistryFactory {
    public static void destroyAll() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        LOCK.lock();
        try {
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
}


public class ZookeeperRegistry extends FailbackRegistry {
    public void destroy() {
        // 调用父类FailbackRegistry关闭注册中心
        super.destroy();
        try {
            // 关闭zkClient客户端保证临时provider节点下线
            zkClient.close();
        } catch (Exception e) {
            logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}


public abstract class FailbackRegistry extends AbstractRegistry {
    public void destroy() {
        if (!canDestroy()){
            return;
        }
        super.destroy();
        try {
            // 首先要明白FailbackRegistry的核心就在于失败重试,所以这一层的关闭只要关闭retryFuture就可以
            retryFuture.cancel(true);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
}


public abstract class AbstractRegistry implements Registry {
    public void destroy() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        // 作为provider,取消所有的服务注册
        Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<URL>(getRegistered())) {
                if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                    try {
                        // 从已注册的列表中移除该URL
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // 作为consumer,取消所有的订阅关系
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }


    public void unregister(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("unregister url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unregister: " + url);
        }
        registered.remove(url);
    }


    public void unsubscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("unsubscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("unsubscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unsubscribe: " + url);
        }
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners != null) {
            listeners.remove(listener);
        }
    }
}

优雅停机过程-协议关闭

说明:

  • 协议关闭按照以下顺序进行:1、关闭provider端的监听;2、关闭作为consumer的reference的服务;3、调用父类针对exporter对象进行清理。
  • 关闭provider端的监听:关闭provider端的监听(server.close)。
  • 关闭consumer的服务:关闭dubbo服务引用的服务(client.close)。
  • 调用父类清理exporter:清理exporter服务(super.destroy)。
public class DubboProtocol extends AbstractProtocol {

    public void destroy() {

        // 关停所有的Server,作为provider将不再接收新的请求
        for (String key : new ArrayList<String>(serverMap.keySet())) {
            ExchangeServer server = serverMap.remove(key);
            if (server != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo server: " + server.getLocalAddress());
                    }
                    server.close(getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }

        // 关停所有的Client,作为consumer将不再发送新的请求
        for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
            ExchangeClient client = referenceClientMap.remove(key);
            if (client != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                    }
                    client.close(getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }

        // 幽灵客户端的处理逻辑,不清楚幽灵客户端是啥?
        for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
            ExchangeClient client = ghostClientMap.remove(key);
            if (client != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                    }
                    client.close(getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
        stubServiceMethodsMap.clear();

        // 调用父类继续进行清理,针对exporter对象进行清理
        super.destroy();
    }
}

provider监听的close过程

说明:

  • provider监听的close过程:关闭心跳检测操作,关闭底层netty服务的监听channel管道。
  • 关闭心跳检测操作:doClose()。
  • 关闭底层netty监听:server.close(timeout)。
public class HeaderExchangeServer implements ExchangeServer {
    public void close(final int timeout) {
        startClose();
        if (timeout > 0) {
            final long max = (long) timeout;
            final long start = System.currentTimeMillis();
            if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
                sendChannelReadOnlyEvent();
            }

            // 如果还有进行中的任务并且没有到达等待时间的上限,则继续等待
            while (HeaderExchangeServer.this.isRunning()
                    && System.currentTimeMillis() - start < max) {
                try {
                    // 休息10毫秒再检查
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        // 关闭心跳,停止应答
        doClose();

        // 关闭通信通道
        server.close(timeout);
    }

    private void doClose() {
        // 修改标记位,该标记为设置为true后,provider不再对上游请求做应答
        if (!closed.compareAndSet(false, true)) {
            return;
        }
         // 取消心跳的Futrue
        stopHeartbeatTimer();
        try {
             // 关闭心跳的线程池
            scheduled.shutdown();
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
}


public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public void close() {
        if (logger.isInfoEnabled()) {
            logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
        ExecutorUtil.shutdownNow(executor, 100);
        try {
            // 设置关闭的标记位
            super.close();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            // 执行真正的关闭动作
            doClose();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    protected abstract void doClose() throws Throwable;
}



public class NettyServer extends AbstractServer implements Server {
    protected void doClose() throws Throwable {
        try {
            if (channel != null) {
                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {
                for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (bootstrap != null) {
                // release external resource.
                bootstrap.releaseExternalResources();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }
}

client的清理过程

说明:

  • client的关闭过程本质上是关闭引用服务的channel对象。
  • client的关闭顺序按照:设置关闭标记位,关闭心跳检测,关闭通道。
public class HeaderExchangeClient implements ExchangeClient {

    public void close(int timeout) {
        startClose();
        doClose();
        channel.close(timeout);
    }

    public void startClose() {
        channel.startClose();
    }

    private void doClose() {
        stopHeartbeatTimer();
    }
}

exporter清理过程

说明:

  • exporter的清理主要包括invoker和exporter两个对象的清理。
  • invoker和exporter两个对象的具体作用暂时还未理清楚,待定。
  • exporter的清理最终还是走到了invoker的清理过程当中。
public abstract class AbstractProtocol implements Protocol {

    public void destroy() {
        for (Invoker<?> invoker : invokers) {
            if (invoker != null) {
                // 移除invokers
                invokers.remove(invoker);
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy reference: " + invoker.getUrl());
                    }
                    // 销毁invokers
                    invoker.destroy();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
        for (String key : new ArrayList<String>(exporterMap.keySet())) {
            // 移除exporter
            Exporter<?> exporter = exporterMap.remove(key);
            if (exporter != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Unexport service: " + exporter.getInvoker().getUrl());
                    }
                    // 销毁exporter
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
    }
}



public class DubboInvoker<T> extends AbstractInvoker<T> {

    public void destroy() {
        if (super.isDestroyed()) {
            return;
        } else {
            destroyLock.lock();
            try {
                if (super.isDestroyed()) {
                    return;
                }
                super.destroy();
                if (invokers != null) {
                    invokers.remove(this);
                }
                for (ExchangeClient client : clients) {
                    try {
                        client.close(getShutdownTimeout());
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }

            } finally {
                destroyLock.unlock();
            }
        }
    }
}


public abstract class AbstractExporter<T> implements Exporter<T> {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    private final Invoker<T> invoker;

    private volatile boolean unexported = false;

    public AbstractExporter(Invoker<T> invoker) {
        if (invoker == null)
            throw new IllegalStateException("service invoker == null");
        if (invoker.getInterface() == null)
            throw new IllegalStateException("service type == null");
        if (invoker.getUrl() == null)
            throw new IllegalStateException("service url == null");
        this.invoker = invoker;
    }

    public Invoker<T> getInvoker() {
        return invoker;
    }

    public void unexport() {
        if (unexported) {
            return;
        }
        unexported = true;
        getInvoker().destroy();
    }

    public String toString() {
        return getInvoker().toString();
    }
}
  • 安卓客户端下载
  • 微信扫一扫
  • weinxin
  • 微信公众号
  • 微信公众号扫一扫
  • weinxin
avatar