十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
大家可能都遇到过DUBBO线程池打满这个问题,刚开始遇到这个问题可能会比较慌,常见方案可能就是重启服务,但也不知道重启是否可以解决。我认为重启不仅不能解决问题,甚至有可能加剧问题,这是为什么呢?本文我们就一起分析DUBBO线程池打满这个问题。

成都创新互联专业为企业提供封丘网站建设、封丘做网站、封丘网站设计、封丘网站制作等企业网站建设、网页设计与制作、封丘企业网站模板建站服务,十多年封丘做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
DUBBO底层网络通信采用Netty框架,我们编写一个Netty服务端进行观察:
public class NettyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new NettyServerHandler());
                }
            });
            ChannelFuture channelFuture = bootstrap.bind(7777).sync();
            System.out.println("服务端准备就绪");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
 BossGroup线程组只有一个线程处理客户端连接请求,连接完成后将完成三次握手的SocketChannel连接分发给WorkerGroup处理读写请求,这两个线程组被称为「IO线程」。
我们再引出「业务线程」这个概念。服务生产者接收到请求后,如果处理逻辑可以快速处理完成,那么可以直接放在IO线程处理,从而减少线程池调度与上下文切换。但是如果处理逻辑非常耗时,或者会发起新IO请求例如查询数据库,那么必须派发到业务线程池处理。
DUBBO提供了多种线程模型,选择线程模型需要在配置文件指定dispatcher属性:
不同线程模型在选择是使用IO线程还是业务线程,DUBBO官网文档说明:
all
所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳
direct
所有消息都不派发到业务线程池,全部在IO线程直接执行
message
只有请求响应消息派发到业务线程池,其它连接断开事件,心跳等消息直接在IO线程执行
execution
只有请求消息派发到业务线程池,响应和其它连接断开事件,心跳等消息直接在IO线程执行
connection
在IO线程上将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池
all所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳direct所有消息都不派发到业务线程池,全部在IO线程直接执行message只有请求响应消息派发到业务线程池,其它连接断开事件,心跳等消息直接在IO线程执行execution只有请求消息派发到业务线程池,响应和其它连接断开事件,心跳等消息直接在IO线程执行connection在IO线程上将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池
生产者和消费者在初始化时确定线程模型:
// 生产者
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
// 消费者
public class NettyClient extends AbstractClient {
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
}
生产者和消费者默认线程模型都会使用AllDispatcher,ChannelHandlers.wrap方法可以获取Dispatch自适应扩展点。如果我们在配置文件中指定dispatcher,扩展点加载器会从URL获取属性值加载对应线程模型。本文以生产者为例进行分析:
public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        // ChannelHandlers.wrap确定线程策略
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}
public class ChannelHandlers {
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
    }
}
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
    @Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})
    ChannelHandler dispatch(ChannelHandler handler, URL url);
}
我们分析其中两个线程模型源码,其它线程模型请阅读DUBBO源码。AllDispatcher模型所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳:
public class AllDispatcher implements Dispatcher {
    // 线程模型名称
    public static final String NAME = "all";
    // 具体实现策略
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}
public class AllChannelHandler extends WrappedChannelHandler {
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 连接完成事件交给业务线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t);
        }
    }
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        // 断开连接事件交给业务线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t);
        }
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 请求响应事件交给业务线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request)message;
                if(request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event", t);
        }
    }
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        // 异常事件交给业务线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t);
        }
    }
}
DirectDispatcher策略所有消息都不派发到业务线程池,全部在IO线程直接执行:
public class DirectDispatcher implements Dispatcher {
    // 线程模型名称
    public static final String NAME = "direct";
    // 具体实现策略
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 直接返回handler表示所有事件都交给IO线程处理
        return handler;
    }
}
上个章节分析了线程模型,我们知道不同的线程模型会选择使用还是IO线程还是业务线程。如果使用业务线程池,那么使用什么线程池策略是本章节需要回答的问题。DUBBO官网线程派发模型图展示了线程模型和线程池策略的关系:
DUBBO提供了多种线程池策略,选择线程池策略需要在配置文件指定threadpool属性:
不同线程池策略会创建不同特性的线程池:
fixed
包含固定个数线程
cached
线程空闲一分钟会被回收,当新请求到来时会创建新线程
limited
线程个数随着任务增加而增加,但不会超过最大阈值。空闲线程不会被回收
eager
当所有核心线程数都处于忙碌状态时,优先创建新线程执行任务,而不是立即放入队列
fixed包含固定个数线程cached线程空闲一分钟会被回收,当新请求到来时会创建新线程limited线程个数随着任务增加而增加,但不会超过最大阈值。空闲线程不会被回收eager当所有核心线程数都处于忙碌状态时,优先创建新线程执行任务,而不是立即放入队列
本文我们以AllDispatcher为例分析线程池策略在什么时候确定:
public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}
public class AllChannelHandler extends WrappedChannelHandler {
    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
}
在WrappedChannelHandler构造函数中如果配置指定了threadpool属性,扩展点加载器会从URL获取属性值加载对应线程池策略,默认策略为fixed:
public class WrappedChannelHandler implements ChannelHandlerDelegate {
    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 获取线程池自适应扩展点
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
}
@SPI("fixed")
public interface ThreadPool {
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);
}
public class FixedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        // 线程名称
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 线程个数默认200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 队列容量默认0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 队列容量等于0使用阻塞队列SynchronousQueue
        // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue
        // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                                      queues == 0 ? new SynchronousQueue()
                                      : (queues < 0 ? new LinkedBlockingQueue()
                                         : new LinkedBlockingQueue(queues)),
                                      new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
   
public class CachedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        // 获取线程名称
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 核心线程数默认0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大线程数默认Int最大值
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // 队列容量默认0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 线程空闲多少时间被回收默认1分钟
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        // 队列容量等于0使用阻塞队列SynchronousQueue
        // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue
        // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                                      queues == 0 ? new SynchronousQueue()
                                      : (queues < 0 ? new LinkedBlockingQueue()
                                         : new LinkedBlockingQueue(queues)),
                                      new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
   
public class LimitedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        // 获取线程名称
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 核心线程数默认0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大线程数默认200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 队列容量默认0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 队列容量等于0使用阻塞队列SynchronousQueue
        // 队列容量小于0使用无界阻塞队列LinkedBlockingQueue
        // 队列容量大于0使用有界阻塞队列LinkedBlockingQueue
        // keepalive时间设置Long.MAX_VALUE表示不回收空闲线程
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                                      queues == 0 ? new SynchronousQueue()
                                      : (queues < 0 ? new LinkedBlockingQueue()
                                         : new LinkedBlockingQueue(queues)),
                                      new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
   
我们知道ThreadPoolExecutor是普通线程执行器。当线程池核心线程达到阈值时新任务放入队列,当队列已满开启新线程处理,当前线程数达到最大线程数时执行拒绝策略。
但是EagerThreadPool自定义线程执行策略,当线程池核心线程达到阈值时,新任务不会放入队列而是开启新线程进行处理(要求当前线程数没有超过最大线程数)。当前线程数达到最大线程数时任务放入队列。
public class EagerThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        // 线程名
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 核心线程数默认0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大线程数默认Int最大值
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // 队列容量默认0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 线程空闲多少时间被回收默认1分钟
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        // 初始化自定义线程池和队列重写相关方法
        TaskQueue taskQueue = new TaskQueue(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}
  
现在我们知道DUBBO会选择线程池策略进行业务处理,那么应该如何估算可            
            文章题目:一个公式看懂:为什么Dubbo线程池会打满            
            浏览路径:http://zsjierui.cn/article/dhojcss.html