首页 XXL-JOB定时任务生成策略解析
文章
取消

XXL-JOB定时任务生成策略解析

一. 数据模型

XXL-JOB定时任务的生成依靠两个数据实现,一个是保存在数据库中的jobinfo,一个是保存在内存中的时间轮数据。

1.1 jobinfo

xxl-job的定时任务信息模型是包含运行时的执行信息的,并不是一个静态的定时任务的信息登记, 而是包括了这个定时任务运行时信息的追踪。

jobinfo包含两部分信息:

  1. 定时任务的静态元数据信息,例如任务的名称,ID,触发方式,过期策略等
  2. 定制任务的运行时数据信息,例如此任务的上次执行时间,下一次应该的执行时间
jobinfo属性说明
triggerStatus运行状态,0停止,1运行
triggerLastTime上一次执行时间,long类型毫秒数
triggerNextTime下一次执行时间,long类型毫秒数

1.2 ringdata

map类型,保存一个时间轮。

  • key为0-59,代表秒数
  • value为List,代表这一秒钟要触发的定时任务列表

二. 线程模型

xxljob是一个生产者消费者模型,

  • 生产者scheduleThread负责生成定时任务,然后往时间轮上更新对应秒数的定时任务列表。
  • 消费者ringThread负责消费时间轮上面的定时任务

2.1 scheduleThread

设计的初衷,生成5秒内的定时任务,放进ringThread中

循环进行单次扫描的逻辑:

  1. 扫描成功,当前时间的5秒内有定时任务,则休眠1秒继续扫描下一个5秒的定时任务
  2. 扫描失败,当前时间的5秒内没有定时任务,则休眠5秒后继续扫描下一个5秒的定时任务

单次扫描的逻辑:

  1. 查询执行时间小于当前时间的5秒内的所有定时任务(有的博文说是查找当前时间5秒内的所有定时任务并不严谨,因为即使上一次执行时间不在这5秒内的定时任务也会被扫描到的)
  2. 遍历这些定时任务,并根据这些定时任务与当前时间的关系做出不同的操作
    1. 当前时间比预定执行时间超过了5秒(一个检查周期),属于没及时触发的过期定时任务,没有能及时触发,根据misfire策略执行
    2. 当前时间比预定时间大,但是在5秒内,即过了预定时间,但是还没有过这个5s的检查周期,立即触发
    3. 当前时间比预定时间小,即还没有到定时任务的触发时间,将当前任务放进时间轮钟,让ringThread进行触发。

2.2 ringThread

设计初衷,每一秒钟执行一次,将当前秒数在时间轮上的所有的定时任务拿出来执行。

执行逻辑:

  1. 保证执行时间与睡眠时间加起来够一秒
  2. 取出当前秒数与之前2秒内的定时任务执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
public class JobScheduleHelper {
    public void start() {

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                int preReadCount =
                    (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig()
                        .getTriggerPoolSlowMax()) * 20;

                while (!scheduleThreadToStop) {

                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {

                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);

                        preparedStatement = conn.prepareStatement(
                            "select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
                        preparedStatement.execute();

                        // tx start

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();
                        //这里虽然以当前时间作为时间基线,但是每个定时任务的jobinfo都是有状态的,会记录下一次定时任务应该执行的时间,查询逻辑是下一次执行时间比当前时间后5秒的时间都要小的定时任务。
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao()
                            .scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList != null && scheduleList.size() > 0) {
                            // 2、push time-ring
                            for (XxlJobInfo jobInfo : scheduleList) {

                                // time-ring jump
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {//超过了定时任务的执行周期了
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1、misfire match
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(
                                        jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        // FIRE_ONCE_NOW 》 trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null,
                                            null, null);
                                        logger.debug(
                                            ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
                                    }

                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo,
                                        new Date());//WARN  这里假如是misfire后的定时任务,计算下一次的执行时间是以当前时间作为起始点计算,假如mifire的时间周期超过这个定时任务的两个执行周期,
                                    // 那么这个定时任务的执行就会漏触发至少一次。例如 这个定时任务1秒一次,下一次执行时间是1秒后,但是线程醒来后已经是4秒后了,misfire策略会处理4秒前的那次触发,
                                    // 然后再生成第5秒的定时任务,中间的那3次定时任务的触发就相当于丢失了


                                } else if (nowTime > jobInfo.getTriggerNextTime()) {//过了定时任务的执行时间,但是任在与一个周期误差内
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 1、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null,
                                        null);
                                    logger.debug(
                                        ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());

                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    if (jobInfo.getTriggerStatus() == 1
                                        && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1、make ring second
                                        int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);

                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                } else {//还没到任务的执行时间
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                    // 1、make ring second
                                    int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }

                            // 3、update trigger info
                            for (XxlJobInfo jobInfo : scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                        } else {
                            preReadSuc = false;
                        }

                        // tx stop

                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

                        // commit
                        if (conn != null) {
                            try {
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    long cost = System.currentTimeMillis() - start;

                    // Wait seconds, align second
                    // 为了保证每一秒扫描一次,如果单次扫描小于1秒则通过sleep凑够秒级休眠
                    // 如果本次扫描时间超过了一秒,则马上进行下一轮扫描
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            // 1. 如果上次扫描成功,则下一个一秒再进行扫描
                            // 2. 如果上次扫描失败,则证明最近五秒钟没有定时任务需要执行,于是休眠一个完整的5秒周期
                            TimeUnit.MILLISECONDS.sleep(
                                (preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();

        // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second 一秒钟循环一次
                    try {
                        // System.currentTimeMillis() % 1000 求出当前时间在一秒钟已经走过了多少毫秒,1000 - 已经走过的毫秒,等于下一个一秒的扫描
                        // 弊端,依赖操作系统的调度,如果过了1秒,操作系统没有及时调度这个线程的话,很可能会出现调度间隔超过1秒钟的情况
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        //这条ring线程的本意是每秒钟轮询一次,然后取出当前这一秒钟的定时任务出来执行。但是在执行的时候有可能很耗时,或者别的情况,导致当前的时间往前走了秒级别的时间
                        //导致定时任务没有执行, 于是这里往前回溯两秒,但实际上这里回溯两秒钟也不一定有用的,万一执行时间超过了2秒,或者极端情况下发生了系统停顿,sleep醒来的时间就已经超过了
                        //两秒钟的时间,那还是会发生漏执行的情况。但会在下一轮的执行中能执行到,不会丢任务,但是极端情况下时间会不准,应该在执行的时候打印此任务的执行时间。
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove(
                                (nowSecond + 60 - i) % 60);//这里为什么要加60再与60求余,这和nowSecond - i的值不是等价的吗
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(
                            ">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId : ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }
}
本文由作者按照 CC BY 4.0 进行授权

数据库连接池配置

Netty HashWheelTimer