xxl-job任务触发流程_沧鹫小hai的博客-CSDN博客_xxl任务

xxl-job老版本是依赖quartz的定时任务触发,在v2.1.0版本开始 移除quartz依赖:一方面是为了精简系统降低冗余依赖,另一方面是为了提供系统的可控度与稳定性。(本文 相应代码版本 2.2.0-SNAPSHOT)

以下是本文的目录大纲:

一.任务触发执行总体流程

  二.任务定时触发流程

  三.关于这么设计的感悟

请尊重作者劳动成果,转载请标明原文链接:

xxl-job任务定时触发流程 - 王小森# - 博客园 

一 任务触发执行总体流程

先来看下任务触发和执行的 完整的任务触发执行总体流程图 如下:

 上图所示左上角的 第一步:任务触发方式 主要有以下几种类型:1 根据设置的时间自动触发JobScheduleHelper,2 页面点击操作按钮执行触发,3 父子任务触发,4失败重试触发。

 本文重点讲解 第一步:任务触发 的第一种 1 根据设置的时间自动触发,即上图 红色框内标示的部分,具体见JobScheduleHelper这个类。

二 任务定时触发流程

 详细的JobScheduleHelperCron定时触发 这个阶段流程图如下:

具体见JobScheduleHelper这个类结合上面流程图来分析,在工程spring启动的时候 触发了JobScheduleHelper类的start()方法,完整代码如下

public void start(){         // schedule thread        scheduleThread = new Thread(new Runnable() {            @Override            public void run() {                 try {                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );                } catch (InterruptedException e) {                    if (!scheduleThreadToStop) {                        logger.error(e.getMessage(), e);                    }                }                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");                 // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;                 while (!scheduleThreadToStop) {                     // Scan Job                    long start = System.currentTimeMillis();                     Connection conn = null;                    Boolean connAutoCommit = null;                    PreparedStatement preparedStatement = null;                     boolean preReadSuc = true;                    try {                         conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();                        connAutoCommit = conn.getAutoCommit();                        conn.setAutoCommit(false);                         preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );                        preparedStatement.execute();                         // tx start                         // 1、pre read                        long nowTime = System.currentTimeMillis();                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);                        if (scheduleList!=null && scheduleList.size()>0) {                            // 2、push time-ring                            for (XxlJobInfo jobInfo: scheduleList) {                                 // time-ring jump                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());                                     // fresh next                                    refreshNextValidTime(jobInfo, new Date());                                 } else if (nowTime > jobInfo.getTriggerNextTime()) {                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time                                     // 1、trigger                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );                                     // 2、fresh next                                    refreshNextValidTime(jobInfo, new Date());                                     // next-trigger-time in 5s, pre-read again                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {                                         // 1、make ring second                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);                                         // 2、push time ring                                        pushTimeRing(ringSecond, jobInfo.getId());                                         // 3、fresh next                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));                                     }                                 } else {                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time                                     // 1、make ring second                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);                                     // 2、push time ring                                    pushTimeRing(ringSecond, jobInfo.getId());                                     // 3、fresh next                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));                                 }                             }                             // 3、update trigger info                            for (XxlJobInfo jobInfo: scheduleList) {                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);                            }                         } else {                            preReadSuc = false;                        }                         // tx stop                      } catch (Exception e) {                        if (!scheduleThreadToStop) {                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);                        }                    } finally {                         // commit                        if (conn != null) {                            try {                                conn.commit();                            } catch (SQLException e) {                                if (!scheduleThreadToStop) {                                    logger.error(e.getMessage(), e);                                }                            }                            try {                                conn.setAutoCommit(connAutoCommit);                            } catch (SQLException e) {                                if (!scheduleThreadToStop) {                                    logger.error(e.getMessage(), e);                                }                            }                            try {                                conn.close();                            } catch (SQLException e) {                                if (!scheduleThreadToStop) {                                    logger.error(e.getMessage(), e);                                }                            }                        }                         // close PreparedStatement                        if (null != preparedStatement) {                            try {                                preparedStatement.close();                            } catch (SQLException e) {                                if (!scheduleThreadToStop) {                                    logger.error(e.getMessage(), e);                                }                            }                        }                    }                    long cost = System.currentTimeMillis()-start;                      // Wait seconds, align second                    if (cost < 1000) {  // scan-overtime, not wait                        try {                            // pre-read period: success > scan each second; fail > skip this period;                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);                        } catch (InterruptedException e) {                            if (!scheduleThreadToStop) {                                logger.error(e.getMessage(), e);                            }                        }                    }                 }                 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");            }        });        scheduleThread.setDaemon(true);        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");        scheduleThread.start();          // ring thread        ringThread = new Thread(new Runnable() {            @Override            public void run() {                 // align second                try {                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );                } catch (InterruptedException e) {                    if (!ringThreadToStop) {                        logger.error(e.getMessage(), e);                    }                }                 while (!ringThreadToStop) {                     try {                        // second data                        List<Integer> ringItemData = new ArrayList<>();                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;                        for (int i = 0; i < 2; i++) {                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );                            if (tmpData != null) {                                ringItemData.addAll(tmpData);                            }                        }                         // ring trigger                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );                        if (ringItemData.size() > 0) {                            // do trigger                            for (int jobId: ringItemData) {                                // do trigger                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);                            }                            // clear                            ringItemData.clear();                        }                    } catch (Exception e) {                        if (!ringThreadToStop) {                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);                        }                    }                     // next second, align second                    try {                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);                    } catch (InterruptedException e) {                        if (!ringThreadToStop) {                            logger.error(e.getMessage(), e);                        }                    }                }                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");            }        });        ringThread.setDaemon(true);        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");        ringThread.start();    }

任务定时触发,流程如下:

1 分布式锁

为了保证分布式一致性先上悲观锁:使用select  xx  for update来实现

conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute();

2 轮询db,找出trigger_next_time(下次触发时间)在距now 5秒内的任务

List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);详细sql如下:trigger_status代表触发状态处于启动的任务 trigger_next_time代表 任务下次 执行触发的时间<select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">   SELECT *   FROM xxl_job_info AS t   WHERE t.trigger_status = 1      and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}   ORDER BY id ASC   LIMIT #{pagesize}</select>

3 触发算法

拿到了距now 5秒内的任务列表数据:scheduleList,分三种情况处理:for循环遍历scheduleList集合

(1)对到达now时间后的任务:(超出now 5秒外):直接跳过不执行; 重置trigger_next_time;

(2)对到达now时间后的任务:(超出now 5秒内):线程执行触发逻辑; 若任务下一次触发时间是在5秒内, 则放到时间轮内(Map<Integer, List<Integer>> 秒数(1-60) => 任务id列表);

        再 重置trigger_next_time

(3)对未到达now时间的任务:直接放到时间轮内;重置trigger_next_time 。

分别对应下面 这个数轴 的 三个阶段

具体参见下面代码:

下面对应代码(1)对到达now时间后的任务(超出now 5秒外):直接跳过不执行; 重置trigger_next_timeif (nowTime > jobInfo.getTriggerNextTime() + 5000) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// fresh nextrefreshNextValidTime(jobInfo, new Date());}
下面对应代码(2)对到达now时间后的任务(超出now 5秒内):线程执行触发逻辑; 若任务下一次触发时间是在5秒内, 则放到时间轮内(Map<Integer, List<Integer>> 秒数(1-60) => 任务id列表);重置trigger_next_time else if (nowTime > jobInfo.getTriggerNextTime()) {        // 1、trigger       JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);        // 2、fresh next        refreshNextValidTime(jobInfo, new Date());        // next-trigger-time in 5s, pre-read again        if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {        // 1、make ring second        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);        // 2、push time ring        pushTimeRing(ringSecond, jobInfo.getId());         // 3、fresh next        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));        }}
下面对应代码(3)对未到达now时间的任务:直接放到时间轮内;重置trigger_next_time else {    // 1、make ring second    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);    // 2、push time ring    pushTimeRing(ringSecond, jobInfo.getId());    // 3、fresh next    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}

上面的refreshNextValidTime方法是 更新任务的 trigger_next_time 下次触发时间,xxl_job_info表是记录定时任务的db表,里面有个trigger_next_time(Long)字段,表示下一次触发的时间点任务时间被修改 

每一次任务触发后,可以根据cronb表达式计算下一次触发时间戳:Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date())),更新trigger_next_time字段。

4 时间轮触发

接下来讲时间轮,时间轮数据结构: Map<Integer, List<Integer>>  key是秒数(1-60)  value是任务id列表,具体结构如下图 :

时间轮的执行代码如下:

public void run() {                 // align second                try {                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );                } catch (InterruptedException e) {                    if (!ringThreadToStop) {                        logger.error(e.getMessage(), e);                    }                }                 while (!ringThreadToStop) {                     try {                        // second data                        List<Integer> ringItemData = new ArrayList<>();                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;                        for (int i = 0; i < 2; i++) {                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );                            if (tmpData != null) {                                ringItemData.addAll(tmpData);                            }                        }                         // ring trigger                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );                        if (ringItemData.size() > 0) {                            // do trigger                            for (int jobId: ringItemData) {                                // do trigger                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);                            }                            // clear                            ringItemData.clear();                        }                    } catch (Exception e) {                        if (!ringThreadToStop) {                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);                        }                    }                     // next second, align second                    try {                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);                    } catch (InterruptedException e) {                        if (!ringThreadToStop) {                            logger.error(e.getMessage(), e);                        }                    }                }                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");            }

时间轮数据结构: Map<Integer, List<Integer>>  key是hash计算触发时间获得的秒数(1-60),value是任务id列表

入轮:扫描任务触发时 (1)本次任务处理完成,但下一次触发时间是在5秒内(2)本次任务未达到触发时间                     

出轮:获取当前时间秒数,从时间轮内移出当前秒数前2个秒数的任务id列表, 依次进行触发任务;(避免处理耗时太长,跨过刻度,多向前校验一秒)

增加时间轮的目的是:任务过多可能会延迟,为了保障触发时间尽可能和 任务设置的触发时间尽量一致,把即将要触发的任务提前放到时间轮里,每秒来触发时间轮相应节点的任务

三 关于这么设计的感悟:看似简单的一个任务触发为什么要搞这么复杂呢?

我的答案是:  因为 出于“性能” 和“时效性”这两点 综合来考虑,即“中庸之道”。

就拿每次 “从DB查出 近期 即将要到触发时间任务” 这个场景 来看:

1  如果希望“性能”更好,那肯定每次多查出些数据,但这样就不可避免的造成 因为任务过多,同一批查出来的位置靠后的某些任务 触发就可能会延迟,比如实际触发比设定触发的时间晚几秒。

2 如果希望“时效性”更好,那肯定每次少查出些数据,比如每次只查出来一条或者几条,实际触发时间和设定的触发时间 基本一样,但这样造成了频繁查询数据库,性能下降。

故 通过“时间轮”来达到既“性能”比较好并且每次查出相对尽量多 的数据(目前是取5s内触发的任务),又通过时间轮来保障“时效性”:实际触发时间和设定的触发时间 尽量一样。这就是设计这么复杂的原因。


原网址: 访问
创建于: 2022-05-19 19:22:26
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论