作者 王勇

为异步任务,添加线程池控制

  1 +package com.sunyo.wlpt.message.bus.service.config;
  2 +
  3 +import org.springframework.context.annotation.Bean;
  4 +import org.springframework.context.annotation.Configuration;
  5 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  6 +
  7 +import java.util.concurrent.Executor;
  8 +import java.util.concurrent.ThreadPoolExecutor;
  9 +
  10 +/**
  11 + * @author 子诚
  12 + * Description:为异步任务,创建线程池
  13 + * 时间:2020/8/20 14:15
  14 + */
  15 +@Configuration
  16 +public class TaskConfiguration {
  17 + @Bean("taskExecutor")
  18 + public Executor taskExecutor() {
  19 + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  20 + // 核心线程数
  21 + executor.setCorePoolSize(20);
  22 + // 最大线程数
  23 + executor.setMaxPoolSize(100);
  24 + // 缓冲队列
  25 + executor.setQueueCapacity(200);
  26 + // 允许线程的空闲时间
  27 + executor.setKeepAliveSeconds(60);
  28 + // 线程池名的前缀
  29 + executor.setThreadNamePrefix("taskExecutor-");
  30 +
  31 + // 优雅的关闭线程池,
  32 + executor.setWaitForTasksToCompleteOnShutdown(true);
  33 + executor.setAwaitTerminationSeconds(60);
  34 +
  35 + // 线程池对拒绝任务的处理策略,CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;
  36 + // 如果执行程序已关闭,则会丢弃该任务
  37 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  38 + return executor;
  39 + }
  40 +}
@@ -53,7 +53,7 @@ public class AsyncTaskService { @@ -53,7 +53,7 @@ public class AsyncTaskService {
53 * 53 *
54 * @param sentData {@link XmlData} 54 * @param sentData {@link XmlData}
55 */ 55 */
56 - @Async 56 + @Async("taskExecutor")
57 public void saveMessage(XmlData sentData) 57 public void saveMessage(XmlData sentData)
58 { 58 {
59 // 无论消息是否发送成功,将消息存储于数据库 59 // 无论消息是否发送成功,将消息存储于数据库
@@ -64,7 +64,7 @@ public class AsyncTaskService { @@ -64,7 +64,7 @@ public class AsyncTaskService {
64 * 当删除服务器的时候,级联删除与服务器有关的 64 * 当删除服务器的时候,级联删除与服务器有关的
65 * 虚拟主机,交换机,路由键,队列,绑定关系 65 * 虚拟主机,交换机,路由键,队列,绑定关系
66 */ 66 */
67 - @Async 67 + @Async("taskExecutor")
68 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) 68 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
69 public void serverCascadeDelete(BusServer busServer) throws IOException, TimeoutException 69 public void serverCascadeDelete(BusServer busServer) throws IOException, TimeoutException
70 { 70 {
@@ -105,7 +105,7 @@ public class AsyncTaskService { @@ -105,7 +105,7 @@ public class AsyncTaskService {
105 * 当删除虚拟主机的时候,级联删除与虚拟主机有关的 105 * 当删除虚拟主机的时候,级联删除与虚拟主机有关的
106 * 交换机,路由键,队列,绑定关系 106 * 交换机,路由键,队列,绑定关系
107 */ 107 */
108 - @Async 108 + @Async("taskExecutor")
109 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) 109 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
110 public void virtualHostCascadeDelete(VirtualHost virtualHost) throws IOException, TimeoutException 110 public void virtualHostCascadeDelete(VirtualHost virtualHost) throws IOException, TimeoutException
111 { 111 {
@@ -143,7 +143,7 @@ public class AsyncTaskService { @@ -143,7 +143,7 @@ public class AsyncTaskService {
143 * <p> 143 * <p>
144 * 同时,删除包含该路由键的绑定关系 144 * 同时,删除包含该路由键的绑定关系
145 */ 145 */
146 - @Async 146 + @Async("taskExecutor")
147 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) 147 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
148 public void exchangeCascadeDelete(BusExchange busExchange) 148 public void exchangeCascadeDelete(BusExchange busExchange)
149 { 149 {
@@ -162,7 +162,7 @@ public class AsyncTaskService { @@ -162,7 +162,7 @@ public class AsyncTaskService {
162 /** 162 /**
163 * 当删除路由键的时候,删除包含该路由键的绑定关系 163 * 当删除路由键的时候,删除包含该路由键的绑定关系
164 */ 164 */
165 - @Async 165 + @Async("taskExecutor")
166 public void routingKeyCascadeDelete(RoutingKey routingKey) throws IOException, TimeoutException 166 public void routingKeyCascadeDelete(RoutingKey routingKey) throws IOException, TimeoutException
167 { 167 {
168 String routingKeyId = routingKey.getId(); 168 String routingKeyId = routingKey.getId();
@@ -179,7 +179,7 @@ public class AsyncTaskService { @@ -179,7 +179,7 @@ public class AsyncTaskService {
179 /** 179 /**
180 * 当删除队列的时候,删除包含队列的绑定关系 180 * 当删除队列的时候,删除包含队列的绑定关系
181 */ 181 */
182 - @Async 182 + @Async("taskExecutor")
183 public void queueCascadeDelete(BusQueue busQueue) 183 public void queueCascadeDelete(BusQueue busQueue)
184 { 184 {
185 // 删除相关绑定关系 185 // 删除相关绑定关系