1. 多线程Configuration
启动类:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan("com.asiainfo.*")
public class Application extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(Application.class);
}
}
@EnableAutoConfiguration:帮助SpringBoot应用将所有符合条件的@Configuration配置都加载到当前SpringBoot创建并使用的IoC容器。
多线程配置类:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
@Configuration
@EnableAsync
public class ExecutorConfig {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
private static Hashtable<String, List<FutureTask>> rejectTaskMap;
@Value("${thread.CORE_POOL_SIZE}")
private int corePoolSize;
@Value("${thread.MAX_POOL_SIZE}")
private int maxPoolSize;
@Value("${thread.QUEUE_CAPACITY}")
private int queueCapacity;
@Bean
public Executor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列最大长度
executor.setQueueCapacity(queueCapacity);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
//自定义拒绝策略
executor.setRejectedExecutionHandler(new MyRejectedPolicyHandler());
rejectTaskMap = new Hashtable<>();
//执行初始化
executor.initialize();
return executor;
}
public static Hashtable<String, List<FutureTask>> getRejectTaskMap() {
return rejectTaskMap;
}
}
@Configuration:
@Configuration用于定义配置类,可替换xml配置文件,被注解的类内部包含有一个或多个被@Bean注解的方法,这些方法将会被AnnotationConfigApplicationContext或AnnotationConfigWebApplicationContext类进行扫描,并用于构建bean定义,初始化Spring容器。
注意:@Configuration注解的配置类有如下要求:
@EnableAsync:
以异步执行,允许开启多线程。
executor.setRejectedExecutionHandler(new MyRejectedPolicyHandler());
设置拒绝策略,当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。
设置线程池:
#多线程配置
thread:
CORE_POOL_SIZE: 10
MAX_POOL_SIZE: 100
QUEUE_CAPACITY: 1000
@Value("${thread.CORE_POOL_SIZE}")
private int corePoolSize;
设置核心线程数量。
@Value("${thread.MAX_POOL_SIZE}")
private int maxPoolSize;
设置最大线程数量。
@Value("${thread.QUEUE_CAPACITY}")
private int queueCapacity;
设置缓冲队列大小。
2. 使用Runner启动项目
SpringBoot给我们提供了两个接口来帮助我们实现容器启动完成后立即执行。这两个接口分别为CommandLineRunner和ApplicationRunner。
定义一个类SimosApplicationRunner实现ApplicationRunner接口,然后Override这个ApplicationRunner接口的run方法即可。
Runner:
import com.asiainfo.processor_other.config.MyRejectedPolicy;
import com.asiainfo.processor_other.task.TestTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Component
@Order(1)
public class TestRunner implements ApplicationRunner {
@Autowired
TestRunner testRunner;
@Autowired
TestTask testTask;
@Override
public void run(ApplicationArguments args) throws Exception {
testRunner.test();
}
@MyRejectedPolicy("runTest")
private void test() {
for (int i = 0; i < 100; i++) {
testTask.runTest(i);
}
}
}
Task:
使用 @Async注解,每调用一次TestTask的runTest方法都会开启一个新的线程;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class TestTask {
@Async("asyncServiceExecutor")
public void runTest(int i) {
System.out.printf("Test:" + i);
}
}
二、自定义拒绝策略
自定义拒绝策略思路:
若线程池配置不合理,或者任务添加的速度大于处理的速度,会执行线程池拒绝策略,四个系统默认的拒绝策略,或者阻塞主进程,或者抛出异常,或者丢弃任务,在某些情况都不适用的情况下需要自定义拒绝策略经行容灾。
改造的思路是自定义拒绝策略,将线程池拒绝的任务缓存到内存中,再在合适的时机重新放入线程池中处理,从而达到了线程池防阻塞、容灾的目的(方案使用的内存对象为线程安全对象,效率较低,只可作为特定情况下的容灾机制使用)。
改造点:
代码实现:
1、ExecutorConfig
参考上文代码,
2、MyRejectedPolicyHandler
自定义拒绝策略类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description: 线程池自定义拒绝策略
* @Date: 2018/12/26 11:25
**/
public class MyRejectedPolicyHandler implements RejectedExecutionHandler {
private static Logger logger = LoggerFactory.getLogger(MyRejectedPolicyHandler.class);
/**
* @Description: 将拒绝的线程放到全局变量中
* @Date: 2018/12/26 11:25
* @Param: [r, executor]
* @Return: void
**/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
FutureTask task = (FutureTask)r;
// 获取调用的方法名称(反射获取私有属性)
try {
Object callable = getFiled(task,"callable");
Method userDeclaredMethod = (Method) getFiled(callable,"val$userDeclaredMethod");
String methodName = userDeclaredMethod.getName();
logger.info("Add task to Map, methodName: [ " + methodName + " ]");
//在内存中维护一个全局Map, 将策略拒绝的task放置到map中
Hashtable<String, List<FutureTask>> rejectTaskMap = ExecutorConfig.getRejectTaskMap();
if(!rejectTaskMap.containsKey(methodName)){
List<FutureTask> taskList = new CopyOnWriteArrayList<>();
rejectTaskMap.put(methodName, taskList);
}
List<FutureTask> taskList = rejectTaskMap.get(methodName);
taskList.add(task);
logger.info("MethodName : [ " + methodName + " ] taskList size : [" + taskList.size() + " ]" );
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
/**
* @Description: 反射,通过循环父类获取field值(含private)
* @Date: 2018/12/26 15:28
* @Param: [c, name]
* @Return: java.lang.Object
**/
private static Object getFiled(Object c, String name) throws IllegalAccessException {
while (c != null && !c.getClass().getName().toLowerCase().equals("java.lang.object")) {
try {
Field field = c.getClass().getDeclaredField(name);
field.setAccessible(true);
return field.get(c);
} catch (NoSuchFieldException e) {
c = c.getClass().getSuperclass();
}
}
return null;
}
}
3、MyRejectedPolicy
自定义拒绝策略注解类
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyRejectedPolicy {
String value() default "";
}
4、MyRejectedPolicyAspect
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
/**
* @Description: 判断线程类任务是否有积留,处理
* @Date: 2018/12/26 16:41
**/
@Aspect
@Component
public class MyRejectedPolicyAspect {
private static Logger logger = LoggerFactory.getLogger(MyRejectedPolicyAspect.class);
@Resource
private Executor asyncServiceExecutor;
@Around("@annotation(MyRejectedPolicy)")
public Object doAroundMethod(ProceedingJoinPoint pjd) throws Throwable {
//取得 PermissionContext 注解属性(值)信息
MethodSignature methodSignature = (MethodSignature)pjd.getSignature();
MyRejectedPolicy myRejectPolicy = methodSignature.getMethod().getAnnotation(MyRejectedPolicy.class);
String methodName = myRejectPolicy.value();
// 判断内存维护的列表中是否有此方法产生的task
Hashtable<String, List<FutureTask>> rejectTaskMap = ExecutorConfig.getRejectTaskMap();
if(rejectTaskMap.containsKey(methodName)){
List<FutureTask> taskList = rejectTaskMap.get(methodName);
// 如果有此方法对应的缓存task,不再往线程池中添加新的task,执行缓存中未执行的task,
int taskListSize = taskList.size();
if(taskListSize > 0){
logger.info("[ " + methodName + " ] method blocked, list size: [" + taskListSize + "]");
Iterator<FutureTask> it = taskList.iterator();
while (it.hasNext()){
asyncServiceExecutor.execute(it.next());
it.remove();
}
// return 是为了打断后续执行, 不再往线程池中添加新的task
return null;
}
}
return pjd.proceed();
}
}
5、使用方式
参考Runner启动部分代码。
在分发线程的部分增加注解,注解值为Map中的key。
run方法中调用注解方法需要注入自身对象,否则切面无法正常捕获。
Task代码中的异步方法需要增加@Async注解,表明为一个异步方法。
原网址: 访问
创建于: 2023-10-20 10:02:55
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论