这世界上没有优秀的理念,只有脚踏实地的结果 。
继上篇【ThreadPoolExecutor】自定义线程池详解(一篇透彻)之后,我们 这次主要讲一下线程池怎么动态调整参数。
本文主要讲Dinamic-Tp开源框架的基础知识。
DynamicTp项目地址:
官网:https://dynamictp.cn/
gitee地址:https://gitee.com/dromara/dynamic-tp
github地址:https://github.com/dromara/dynamic-tp
使用线程池 ThreadPoolExecutor 过程中你是否有以下痛点呢?
代码中创建了一个 ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
凭经验设置参数值,上线后发现需要调整,改代码重新发布服务,非常麻烦
线程池相对开发人员来说是个黑盒,运行情况不能及时感知到,直到出现问题
那么使用dynammic-tp框架就可以解决以上问题。
代码零侵入:我们改变了线程池以往的使用姿势,所有配置均放在配置中心,服务启动时会从配置中心拉取配置生成线程池对象放到 Spring 容器中,使用时直接从 Spring 容器中获取,对业务代码零侵入
通知告警:提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝触发报警、任务执行或等待超时报警),已支持企业微信、钉钉、飞书、邮件报警,同时提供 SPI 接口可自定义扩展实现
运行监控:定时采集线程池指标数据,支持通过 MicroMeter、JsonLog 日志输出、Endpoint 三种方式,可通过 SPI 接口自定义扩展实现
任务增强:提供任务包装功能,实现 TaskWrapper 接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper,可以支持线程池上下文信息传递
多配置中心支持:基于主流配置中心实现线程池参数动态调整,实时生效,已支持 Nacos、Apollo、Zookeeper、Consul、Etcd、Polaris,同时也提供 SPI 接口可自定义扩展实现
中间件线程池管理:集成管理常用第三方组件的线程池,已集成 Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars、SofaRpc、RabbitMq 等组件的线程池管理(调参、监控报警)
轻量简单:基于 SpringBoot 实现,引入 starter,接入只需简单 4 步就可完成,顺利 3 分钟搞定
多模式:参考 Tomcat 线程池提供了 IO 密集型场景使用的 EagerDtpExecutor 线程池
兼容性:JUC 普通线程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架监控,@Bean 定义时加 @DynamicTp 注解即可
可靠性:框架提供的线程池实现 Spring 生命周期方法,可以在 Spring 容器关闭前尽可能多的处理队列中的任务
高可扩展:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装等等)
线上大规模应用:参考美团线程池实践open in new window,美团内部已经有该理论成熟的应用经验
框架功能大体可以分为以下几个模块
配置变更监听模块
线程池管理模块
监控模块
通知告警模块
三方组件线程池管理模块
以多 Module 的方式组织代码,提升代码的可维护性,方便扩展
引入相应配置中心的依赖,具体见下述 maven 依赖
配置中心配置线程池实例,配置文件见下述
启动类加 @EnableDynamicTp 注解
使用 @Resource 或 @Autowired 进行依赖注入,或通过 DtpRegistry.getDtpExecutor("name") 获取
通过以上 4 步就可以使用了,是不是感觉超级简单呀
建议直接配置在配置中心,但是如果想后期再添加到配置中心,可以先用 @Bean 编码式声明(方便 Spring 依赖注入)
@Configuration
public class DtpConfig { /*** 通过 {@link @DynamicTp} 注解定义普通 juc 线程池,会享受到该框架监控功能,注解名称优先级高于方法名** @return 线程池实例*/@DynamicTp("commonExecutor")@Beanpublic ThreadPoolExecutor commonExecutor() {return (ThreadPoolExecutor) Executors.newFixedThreadPool(1);}/*** 通过 {@link ThreadPoolCreator} 快速创建一些简单配置的动态线程池* tips: 建议直接在配置中心配置就行,不用 @Bean 声明** @return 线程池实例*/@Beanpublic DtpExecutor dtpExecutor1() {return ThreadPoolCreator.createDynamicFast("dtpExecutor1");}/*** 通过 {@link ThreadPoolBuilder} 设置详细参数创建动态线程池(推荐方式),* ioIntensive,参考 tomcat 线程池设计,实现了处理 io 密集型任务的线程池,具体参数可以看代码注释** tips: 建议直接在配置中心配置就行,不用@Bean声明* @return 线程池实例*/@Beanpublic DtpExecutor ioIntensiveExecutor() {return ThreadPoolBuilder.newBuilder().threadPoolName("ioIntensiveExecutor").corePoolSize(20).maximumPoolSize(50).queueCapacity(2048).ioIntensive(true).buildDynamic();}/*** tips: 建议直接在配置中心配置就行,不用 @Bean 声明* @return 线程池实例*/@Beanpublic ThreadPoolExecutor dtpExecutor2() {return ThreadPoolBuilder.newBuilder().threadPoolName("dtpExecutor2").corePoolSize(10).maximumPoolSize(15).keepAliveTime(50).timeUnit(TimeUnit.MILLISECONDS).workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false).waitForTasksToCompleteOnShutdown(true).awaitTerminationSeconds(5).buildDynamic();}
}
从 DtpRegistry 中根据线程池名称获取,或者通过依赖注入方式(推荐,更优雅)
依赖注入方式使用,优先推荐依赖注入方式,不能使用依赖注入的场景可以使用方式2
@Resource
private ThreadPoolExecutor dtpExecutor1;public void exec() {dtpExecutor1.execute(() -> System.out.println("test"));
}
从 DtpRegistry 注册器获取(接口场景可用)
public static void main(String[] args) {DtpExecutor dtpExecutor = DtpRegistry.getDtpExecutor("dtpExecutor1");dtpExecutor.execute(() -> System.out.println("test"));
}
更详细使用实例请参考 源码中example
工程
提供了一些任务包装器,可以实现特定的功能
MdcTaskWrapper 支持 MDC 上下文传递,名称:mdc
TtlTaskWrapper 支持 ThreadLocal 上下文传递,名称:ttl
SwTraceTaskWrapper 支持 skywalking TID 传递,名称:swTrace
NamedRunnable 支持给任务添加名称
可以继承 TaskWrapper 接口自定义任务包装器
MdcTaskWrapper、TtlTaskWrapper、NamedRunnable 在 core 包中,不需要引入其他依赖
SwTraceTaskWrapper 是 extension 模块提供的扩展,需要引入依赖
cn.dynamictp dynamic-tp-extension-skywalking 1.1.0
线程池配置文件加如下配置项
spring:dynamic:tp:executors: # 动态线程池配置,省略其他项,具体看上述配置文件- threadPoolName: dtpExecutor1taskWrapperNames: ["ttl", "mdc", "swTrace"] # 任务包装器名称,继承 TaskWrapper 接口