XXL-JOB源码一日游

由于采用分布式调度 XXL-job,故到此地源码一日游。

整体介绍

  • 基于开源 Quartz 调度内核的调度工具,自带任务配置界面、任务监控、分布式执行器等功能。
  • 主要分为两部分:
    • xxl-job-admin: 是调度中心,执行调度任务。
    • xxl-job-core:公共依赖,定义了执行器的接口,实现了任务的执行和RPC。
    • xxl-job-executor-sample:是执行器,主要执行定时任务。

xxl-job-admin

主要功能

  • 调度中心
    • xxl_job_qrtz_trigger_group:定时任务配置详情
    • xxl_job_qrtz_trigger_info:job所属的分组,一般一个应用配置一个group,然后每个goup有多个address(集群模式)
    • xxl_job_qrtz_triggers:定时任务真正执行
    • 负责任务的管理(新增、删除、更新)
    • 触发(手动触发、自动触发、暂停、恢复)
  • 日志监控
    • 使用 Rolling 日志
  • 回调服务:
    • xxl_job_qrtz_trigger_log:定时任务执行日志
    • 任务执行结束之后,执行器通过接口回调,调度中心根据结果记录到日志表。
  • 注册中心:
    • xxl_job_qrtz_trigger_registry:注册执行器的配置信息。
    • 当有新的相同的集群的地址注册后,将会有线程更新到 xxl_job_qrtz_trigger_group 的 address_list 中。
    • 提供一个接口,在应用启动的时候来注册一下,告知调度中心自己的IP和端口。
    • 同时还可以通过手动调度器的地址和端口。

代码实现

xxl-job-admin 配置

<bean id="quartzScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="autoStartup" value="true" />            <!--自动启动 -->
    <property name="startupDelay" value="20" />                <!--延时启动,应用启动成功后在启动 -->
    <property name="overwriteExistingJobs" value="true" />    <!--覆盖DB中JOB:true、以数据库中已经存在的为准:false -->
    <property name="applicationContextSchedulerContextKey"  value="applicationContextKey" />
    <property name="configLocation" value="classpath:quartz.properties"/>
</bean>
  • 内核依赖 Quartz,任务信息注册到 MySQL中,在指定的时间点(cron表达式),ScheduleFactoryBean 会根据任务信息指定的 job class来拉起任务,xxl-job-admin 在把任务分配到 executor 中运行。
  • xxl-job-admin 和 job-executor 之间,使用了自定义的 RPC 协议,该RPC使用hession做序列化,在http层传输。
  • 分片广播:xxl-job-admin 将分片分别发送到 job-executor,executor在线程上下文中获取分片索引。

项目启动流程分析

  1. 首先从 web.xml 启动,进行 Spring 容器配置扫描

    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath*:spring/applicationcontext-*.xml</param-value>
    </context-param>

    XML配置文件中其他配置很一般关键是 applicationcontext-xxl-job-admin.xml 文件中 xxlJobDynamicScheduler

  2. xxlJobDynamicScheduler 注入 schedulerAccessToken

    <bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" >
        <property name="scheduler" ref="quartzScheduler"/>
        <property name="accessToken" value="${xxl.job.accessToken}" />
    </bean>
  3. 进入实体类

    public final class XxlJobDynamicScheduler implements ApplicationContextAware {
        // dao
        public static XxlJobLogDao xxlJobLogDao;
        public static XxlJobInfoDao xxlJobInfoDao;
        public static XxlJobRegistryDao xxlJobRegistryDao;
        public static XxlJobGroupDao xxlJobGroupDao;
        public static AdminBiz adminBiz;
    
        // ---------------------- applicationContext ----------------------
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            XxlJobDynamicScheduler.xxlJobLogDao = applicationContext.getBean(XxlJobLogDao.class);
            XxlJobDynamicScheduler.xxlJobInfoDao = applicationContext.getBean(XxlJobInfoDao.class);
            XxlJobDynamicScheduler.xxlJobRegistryDao = applicationContext.getBean(XxlJobRegistryDao.class);
            XxlJobDynamicScheduler.xxlJobGroupDao = applicationContext.getBean(XxlJobGroupDao.class);
            XxlJobDynamicScheduler.adminBiz = applicationContext.getBean(AdminBiz.class);
        }
    }

    此处只写关键位置,因为实现 ApplicationContextAware 接口,可以通过这个上下文环境对象得到 Spring 容器中的Bean,此时内部的 static 的内部 Dao 制定了获取的Bean。

  4. 初始化调用 init() 方法和 销毁调用 destory() 方法

    public void init() throws Exception {
        // admin registry monitor run
        JobRegistryMonitorHelper.getInstance().start();
    
        // admin monitor run
        JobFailMonitorHelper.getInstance().start();
    
        // admin-server(spring-mvc)
        NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
        NetComServerFactory.setAccessToken(accessToken);
    
        // init i18n
        initI18n();
    
        // valid
        Assert.notNull(scheduler, "quartz scheduler is null");
        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
    
    public void destroy(){
        // admin registry stop
        JobRegistryMonitorHelper.getInstance().toStop();
    
        // admin monitor stop
        JobFailMonitorHelper.getInstance().toStop();
    }

    根据xml配置,初始化时调用 init() 方法。

  5. 注册监控运行
    // admin registry monitor run
    JobRegistryMonitorHelper.getInstance().start();
    
    // admin monitor run
    JobFailMonitorHelper.getInstance().start();

    分析 JobRegistryMonitorHelper 代码处理

    public class JobRegistryMonitorHelper {
        private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class);
    
        private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper();
        public static JobRegistryMonitorHelper getInstance(){
            return instance;
        }
    
        private Thread registryThread;
        private volatile boolean toStop = false;
        public void start(){
            registryThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!toStop) {
                        try {
                            // auto registry group 发现启用类型为0的应用
                            List<XxlJobGroup> groupList = XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);
                            if (CollectionUtils.isNotEmpty(groupList)) {
    
                                // remove dead address (admin/executor) 去除掉九十秒没有心跳的节点
                                XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);
    
                                // fresh online address (admin/executor)
                                HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                                // 检查九十秒有心跳的节点
                                List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
                                if (list != null) {
                                    for (XxlJobRegistry item: list) {
                                        if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                            String appName = item.getRegistryKey();
                                            List<String> registryList = appAddressMap.get(appName);
                                            if (registryList == null) {
                                                registryList = new ArrayList<String>();
                                            }
    
                                            if (!registryList.contains(item.getRegistryValue())) {
                                                registryList.add(item.getRegistryValue());
                                            }
                                            // 把相同的group,地址放置在相同列表
                                            appAddressMap.put(appName, registryList);
                                        }
                                    }
                                }
    
                                // fresh group address,然后把相同group的应用机器地址,跟新到jobgroupdao中
                                for (XxlJobGroup group: groupList) {
                                    List<String> registryList = appAddressMap.get(group.getAppName());
                                    String addressListStr = null;
                                    if (CollectionUtils.isNotEmpty(registryList)) {
                                        Collections.sort(registryList);
                                        addressListStr = StringUtils.join(registryList, ",");
                                    }
                                    group.setAddressList(addressListStr);
                                    XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
                                }
                            }
                        } catch (Exception e) {
                            logger.error("job registry instance error:{}", e);
                        }
                        try {
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        } catch (InterruptedException e) {
                            logger.error("job registry instance error:{}", e);
                        }
                    }
                }
            });
            registryThread.setDaemon(true);
            registryThread.start();
        }
    
        public void toStop(){
            toStop = true;
            // interrupt and wait
            registryThread.interrupt();
            try {
                registryThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    
    }

    进行了90秒刷新注册机器地址,更新。

  6. 任务失败监控,具体实现
    public class JobFailMonitorHelper {
        private static Logger logger = LoggerFactory.getLogger(JobFailMonitorHelper.class);
    
        private static JobFailMonitorHelper instance = new JobFailMonitorHelper();
        public static JobFailMonitorHelper getInstance(){
            return instance;
        }
    
        // ---------------------- monitor ----------------------
    
        private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(0xfff8);
    
        private Thread monitorThread;
        private volatile boolean toStop = false;
        public void start(){
            monitorThread = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    // monitor
                    while (!toStop) {
                        try {
                            List<Integer> jobLogIdList = new ArrayList<Integer>();
                            int drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);
    
                            if (CollectionUtils.isNotEmpty(jobLogIdList)) {
                                for (Integer jobLogId : jobLogIdList) {
                                    if (jobLogId==null || jobLogId==0) {
                                        continue;
                                    }
                                    XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
                                    if (log == null) {
                                        continue;
                                    }
                                    if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
                                        JobFailMonitorHelper.monitor(jobLogId);
                                        logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
                                    } else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {
                                        // job success, pass
                                        logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
                                    } else if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
                                               || IJobHandler.FAIL.getCode() == log.getHandleCode()
                                               || IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() ) {
                                        // job fail,
                                        failAlarm(log);
                                        logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
                                    } else {
                                        JobFailMonitorHelper.monitor(jobLogId);
                                        logger.info(">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}", jobLogId);
                                    }
                                }
                            }
    
                            TimeUnit.SECONDS.sleep(10);
                        } catch (Exception e) {
                            logger.error("job monitor error:{}", e);
                        }
                    }
    
                    // monitor all clear
                    List<Integer> jobLogIdList = new ArrayList<Integer>();
                    int drainToNum = getInstance().queue.drainTo(jobLogIdList);
                    if (jobLogIdList!=null && jobLogIdList.size()>0) {
                        for (Integer jobLogId: jobLogIdList) {
                            XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
                            if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
                                // job fail,
                                failAlarm(log);
                                logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId);
                            }
                        }
                    }
    
                }
            });
            monitorThread.setDaemon(true);
            monitorThread.start();
        }
    
        public void toStop(){
            toStop = true;
            // interrupt and wait
            monitorThread.interrupt();
            try {
                monitorThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
    
        // producer
        public static void monitor(int jobLogId){
            getInstance().queue.offer(jobLogId);
        }
   // ---------------------- alarm ----------------------

   // email alarm template
   private static final String mailBodyTemplate = "<h5>" + I18nUtil.getString("jobconf_monitor_detail") + ":</span>" +
       "<table border=\"1\" cellpadding=\"3\" style=\"border-collapse:collapse; width:80%;\" >\n" +
       "   <thead style=\"font-weight: bold;color: #ffffff;background-color: #ff8c00;\" >" +
       "      <tr>\n" +
       "         <td>"+ I18nUtil.getString("jobinfo_field_jobgroup") +"</td>\n" +
       "         <td>"+ I18nUtil.getString("jobinfo_field_id") +"</td>\n" +
       "         <td>"+ I18nUtil.getString("jobinfo_field_jobdesc") +"</td>\n" +
       "         <td>"+ I18nUtil.getString("jobconf_monitor_alarm_title") +"</td>\n" +
       "      </tr>\n" +
       "   <thead/>\n" +
       "   <tbody>\n" +
       "      <tr>\n" +
       "         <td>{0}</td>\n" +
       "         <td>{1}</td>\n" +
       "         <td>{2}</td>\n" +
       "         <td>"+ I18nUtil.getString("jobconf_monitor_alarm_type") +"</td>\n" +
       "      </tr>\n" +
       "   <tbody>\n" +
       "</table>";

   /**
    * fail alarm
       *
    * @param jobLog
       */
   private void failAlarm(XxlJobLog jobLog){

       // send monitor email
       XxlJobInfo info = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobLog.getJobId());
       if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {

           Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));
           for (String email: emailSet) {
               XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(Integer.valueOf(info.getJobGroup()));

               String title = I18nUtil.getString("jobconf_monitor");
               String content = MessageFormat.format(mailBodyTemplate, group!=null?group.getTitle():"null", info.getId(), info.getJobDesc());

               MailUtil.sendMail(email, title, content);
           }
       }

       // TODO, custom alarm strategy, such as sms

   }

}


   ​

7. ##### LinkedBlockingQueue 关键实现

   LinkedBlockingQueue 是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,实际上我们对 LinkedBlockingQueue 的 API 操作都是间接操作该数据队列,这里我们先看看 LinkedBlockingQueue 的内部成员变量

   ```java
   public class LinkedBlockingQueue<E> extends AbstractQueue<E>
           implements BlockingQueue<E>, java.io.Serializable {

       /**
        * 节点类,用于存储数据
        */
       static class Node<E> {
           E item;

           /**
            * One of:
            * - the real successor Node
            * - this Node, meaning the successor is head.next
            * - null, meaning there is no successor (this is the last node)
            */
           Node<E> next;

           Node(E x) { item = x; }
       }

       /** 阻塞队列的大小,默认为Integer.MAX_VALUE */
       private final int capacity;

       /** 当前阻塞队列中的元素个数 */
       private final AtomicInteger count = new AtomicInteger();

       /**
        * 阻塞队列的头结点
        */
       transient Node<E> head;

       /**
        * 阻塞队列的尾节点
        */
       private transient Node<E> last;

       /** 获取并移除元素时使用的锁,如take, poll, etc */
       private final ReentrantLock takeLock = new ReentrantLock();

       /** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
       private final Condition notEmpty = takeLock.newCondition();

       /** 添加元素时使用的锁如 put, offer, etc */
       private final ReentrantLock putLock = new ReentrantLock();

       /** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
       private final Condition notFull = putLock.newCondition();

   }

从上述可看成,每个添加到 LinkedBlockingQueue 队列中的数据都将被封装成 Node 节点,添加的链表队列中,其中 head 和 last 分别指向队列的头结点和尾结点。与 ArrayBlockingQueue 不同的是,LinkedBlockingQueue 内部分别使用了 takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。这里再次强调如果没有给 LinkedBlockingQueue 指定容量大小,其默认值将是 Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。至于 LinkedBlockingQueue 的实现原理图与 ArrayBlockingQueue 是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的 Condition 条件对象作为等待队列,用于挂起 take 线程和 put 线程。

ok~, 下面我们看看其其内部添加过程和删除过程是如何实现的。

添加方法的实现原理

对于添加方法,主要指的是 add,offer 以及 put,这里先看看 add 方法和 offer 方法的实现

   public boolean add(E e) {
       if (offer(e))
           return true;
       else
           throw new IllegalStateException("Queue full");
   }

从源码可以看出,add 方法间接调用的是 offer 方法,如果 add 方法添加失败将抛出 IllegalStateException 异常,添加成功则返回 true,那么下面我们直接看看 offer 的相关方法实现

   public boolean offer(E e) {
       //添加元素为null直接抛出异常
       if (e == null) throw new NullPointerException();
       //获取队列的个数
       final AtomicInteger count = this.count;
       //判断队列是否已满
       if (count.get() == capacity)
           return false;
       int c = -1;
       //构建节点
       Node<E> node = new Node<E>(e);
       final ReentrantLock putLock = this.putLock;
       putLock.lock();
       try {
           //再次判断队列是否已满,考虑并发情况
           if (count.get() < capacity) {
               enqueue(node);//添加元素
               c = count.getAndIncrement();//拿到当前未添加新元素时的队列长度
               //如果容量还没满
               if (c + 1 < capacity)
                   notFull.signal();//唤醒下一个添加线程,执行添加操作
           }
       } finally {
           putLock.unlock();
       }
       // 由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等到线程,因此count肯定会变化。
       //这里的if条件表示如果队列中还有1条数据
       if (c == 0) 
           signalNotEmpty();//如果还存在数据那么就唤醒消费锁
       return c >= 0; // 添加成功返回true,否则返回false
   }

   //入队操作
   private void enqueue(Node<E> node) {
       //队列尾节点指向新的node节点
       last = last.next = node;
   }

   //signalNotEmpty方法
   private void signalNotEmpty() {
       final ReentrantLock takeLock = this.takeLock;
       takeLock.lock();
       //唤醒获取并删除元素的线程
       notEmpty.signal();
   } finally {
       takeLock.unlock();
   }
   }

这里的 Offer() 方法做了两件事,第一件事是判断队列是否满,满了就直接释放锁,没满就将节点封装成 Node 入队,然后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象 notFull 上的添加线程。第二件事是,判断是否需要唤醒等到在 notEmpty 条件对象上的消费线程。这里我们可能会有点疑惑,为什么添加完成后是继续唤醒在条件对象 notFull 上的添加线程而不是像 ArrayBlockingQueue 那样直接唤醒 notEmpty 条件对象上的消费线程?而又为什么要当if (c == 0)时才去唤醒消费线程呢?

  • 唤醒添加线程的原因,在添加新元素完成后,会判断队列是否已满,不满就继续唤醒在条件对象 notFull 上的添加线程,这点与前面分析的 ArrayBlockingQueue 很不相同,在 ArrayBlockingQueue 内部完成添加操作后,会直接唤醒消费线程对元素进行获取,这是因为 ArrayBlockingQueue 只用了一个 ReenterLock 同时对添加线程和消费线程进行控制,这样如果在添加完成后再次唤醒添加线程的话,消费线程可能永远无法执行,而对于 LinkedBlockingQueue 来说就不一样了,其内部对添加线程和消费线程分别使用了各自的 ReenterLock 锁对并发进行控制,也就是说添加线程和消费线程是不会互斥的,所以添加锁只要管好自己的添加线程即可,添加线程自己直接唤醒自己的其他添加线程,如果没有等待的添加线程,直接结束了。如果有就直到队列元素已满才结束挂起,当然 offer 方法并不会挂起,而是直接结束,只有 put 方法才会当队列满时才执行挂起操作。注意消费线程的执行过程也是如此。这也是为什么 LinkedBlockingQueue 的吞吐量要相对大些的原因。

  • 为什么要判断if (c == 0)时才去唤醒消费线程呢,这是因为消费线程一旦被唤醒是一直在消费的(前提是有数据),所以 c 值是一直在变化的,c 值是添加完元素前队列的大小,此时 c 只可能是 0 或c>0,如果是c=0,那么说明之前消费线程已停止,条件对象上可能存在等待的消费线程,添加完数据后应该是c+1,那么有数据就直接唤醒等待消费线程,如果没有就结束啦,等待下一次的消费操作。如果c>0那么消费线程就不会被唤醒,只能等待下一个消费操作(poll、take、remove)的调用,那为什么不是条件c>0才去唤醒呢?我们要明白的是消费线程一旦被唤醒会和添加线程一样,一直不断唤醒其他消费线程,如果添加前c>0,那么很可能上一次调用的消费线程后,数据并没有被消费完,条件队列上也就不存在等待的消费线程了,所以c>0唤醒消费线程得意义不是很大,当然如果添加线程一直添加元素,那么一直c>0,消费线程执行的换就要等待下一次调用消费操作了(poll、take、remove)。

    移除方法的实现原理

    关于移除的方法主要是指 remove 和 poll 以及 take 方法,下面一一分析

    public boolean remove(Object o) {
      if (o == null) return false;
      fullyLock();//同时对putLock和takeLock加锁
      try {
          //循环查找要删除的元素
          for (Node<E> trail = head, p = trail.next;
               p != null;
               trail = p, p = p.next) {
              if (o.equals(p.item)) {//找到要删除的节点
                  unlink(p, trail);//直接删除
                  return true;
              }
          }
          return false;
      } finally {
          fullyUnlock();//解锁
      }
    }
    
    //两个同时加锁
    void fullyLock() {
      putLock.lock();
      takeLock.lock();
    }
    
    void fullyUnlock() {
      takeLock.unlock();
      putLock.unlock();
    }

    remove 方法删除指定的对象,这里我们可能会诧异,为什么同时对 putLock 和 takeLock 加锁?这是因为 remove 方法删除的数据的位置不确定,为了避免造成并非安全问题,所以需要对 2 个锁同时加锁。

    public E poll() {
      //获取当前队列的大小
      final AtomicInteger count = this.count;
      if (count.get() == 0)//如果没有元素直接返回null
          return null;
      E x = null;
      int c = -1;
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lock();
      try {
          //判断队列是否有数据
          if (count.get() > 0) {
              //如果有,直接删除并获取该元素值
              x = dequeue();
              //当前队列大小减一
              c = count.getAndDecrement();
              //如果队列未空,继续唤醒等待在条件对象notEmpty上的消费线程
              if (c > 1)
                  notEmpty.signal();
          }
      } finally {
          takeLock.unlock();
      }
      //判断c是否等于capacity,这是因为如果满说明NotFull条件对象上
      //可能存在等待的添加线程
      if (c == capacity)
          signalNotFull();
      return x;
    }
    
    private E dequeue() {
      Node<E> h = head;//获取头结点
      Node<E> first = h.next; 获取头结的下一个节点(要删除的节点)
          h.next = h; // help GC//自己next指向自己,即被删除
      head = first;//更新头结点
      E x = first.item;//获取删除节点的值
      first.item = null;//清空数据,因为first变成头结点是不能带数据的,这样也就删除队列的带数据的第一个节点
      return x;
    }

    poll 方法也比较简单,如果队列没有数据就返回 null,如果队列有数据,那么就取出来,如果队列还有数据那么唤醒等待在条件对象 notEmpty 上的消费线程。然后判断 if (c == capacity) 为 true 就唤醒添加线程,这点与前面分析 if(c==0) 是一样的道理。因为只有可能队列满了,notFull 条件对象上才可能存在等待的添加线程。

    public E take() throws InterruptedException {
      E x;
      int c = -1;
      //获取当前队列大小
      final AtomicInteger count = this.count;
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lockInterruptibly();//可中断
      try {
          //如果队列没有数据,挂机当前线程到条件对象的等待队列中
          while (count.get() == 0) {
              notEmpty.await();
          }
          //如果存在数据直接删除并返回该数据
          x = dequeue();
          c = count.getAndDecrement();//队列大小减1
          if (c > 1)
              notEmpty.signal();//还有数据就唤醒后续的消费线程
      } finally {
          takeLock.unlock();
      }
      //满足条件,唤醒条件对象上等待队列中的添加线程
      if (c == capacity)
          signalNotFull();
      return x;
    }

    take 方法是一个可阻塞可中断的移除方法,主要做了两件事,一是,如果队列没有数据就挂起当前线程到 notEmpty 条件对象的等待队列中一直等待,如果有数据就删除节点并返回数据项,同时唤醒后续消费线程,二是尝试唤醒条件对象 notFull 上等待队列中的添加线程。 到此关于 remove、poll、take 的实现也分析完了,其中只有 take 方法具备阻塞功能。remove 方法则是成功返回 true 失败返回 false,poll 方法成功返回被移除的值,失败或没数据返回 null。下面再看看两个检查方法,即 peek 和 element

    //构造方法,head 节点不存放数据
    public LinkedBlockingQueue(int capacity) {
      if (capacity <= 0) throw new IllegalArgumentException();
      this.capacity = capacity;
      last = head = new Node<E>(null);
    }
    
    public E element() {
      E x = peek();//直接调用peek
      if (x != null)
          return x;
      else
          throw new NoSuchElementException();//没数据抛异常
    }
    
    public E peek() {
      if (count.get() == 0)
          return null;
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lock();
      try {
          //获取头结节点的下一个节点
          Node<E> first = head.next;
          if (first == null)
              return null;//为null就返回null
          else
              return first.item;//返回值
      } finally {
          takeLock.unlock();
      }
    }

    从代码来看,head 头结节点在初始化时是本身不带数据的,仅仅作为头部 head 方便我们执行链表的相关操作。peek 返回直接获取头结点的下一个节点返回其值,如果没有值就返回 null,有值就返回节点对应的值。element 方法内部调用的是 peek,有数据就返回,没数据就抛异常。下面我们最后来看两个根据时间阻塞的方法,比较有意思,利用的 Conditin 来实现的。

    //在指定时间内阻塞添加的方法,超时就结束
    public boolean offer(E e, long timeout, TimeUnit unit)
      throws InterruptedException {
    
      if (e == null) throw new NullPointerException();
      //将时间转换成纳秒
      long nanos = unit.toNanos(timeout);
      int c = -1;
      //获取锁
      final ReentrantLock putLock = this.putLock;
      //获取当前队列大小
      final AtomicInteger count = this.count;
      //锁中断(如果需要)
      putLock.lockInterruptibly();
      try {
          //判断队列是否满
          while (count.get() == capacity) {
              if (nanos <= 0)
                  return false;
              //如果队列满根据阻塞的等待
              nanos = notFull.awaitNanos(nanos);
          }
          //队列没满直接入队
          enqueue(new Node<E>(e));
          c = count.getAndIncrement();
          //唤醒条件对象上等待的线程
          if (c + 1 < capacity)
              notFull.signal();
      } finally { 
          putLock.unlock();
      }
      //唤醒消费线程
      if (c == 0)
          signalNotEmpty();
      return true;
    }

    对于这个 offer 方法,我们重点来看看阻塞的这段代码

    //判断队列是否满
    while (count.get() == capacity) {
      if (nanos <= 0)
          return false;
      //如果队列满根据阻塞的等待
      nanos = notFull.awaitNanos(nanos);
    }
    
    //CoditionObject(Codition的实现类)中的awaitNanos方法
    public final long awaitNanos(long nanosTimeout)
      throws InterruptedException {
      if (Thread.interrupted())
          throw new InterruptedException();
      //这里是将当前添加线程封装成NODE节点加入Condition的等待队列中
      //注意这里的NODE是AQS的内部类Node
      Node node = addConditionWaiter();
      //加入等待,那么就释放当前线程持有的锁
      int savedState = fullyRelease(node);
      //计算过期时间
      final long deadline = System.nanoTime() + nanosTimeout;
      int interruptMode = 0;
    
      while (!isOnSyncQueue(node)) {
          if (nanosTimeout <= 0L) {
              transferAfterCancelledWait(node);
              break;
          }
          //主要看这里!!由于是while 循环,这里会不断判断等待时间
          //nanosTimeout 是否超时
          //static final long spinForTimeoutThreshold = 1000L;
          if (nanosTimeout >= spinForTimeoutThreshold)
              LockSupport.parkNanos(this, nanosTimeout);//挂起线程
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
          //重新计算剩余等待时间,while循环中继续判断下列公式
          //nanosTimeout >= spinForTimeoutThreshold
          nanosTimeout = deadline - System.nanoTime();
      }
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
          interruptMode = REINTERRUPT;
      if (node.nextWaiter != null)
          unlinkCancelledWaiters();
      if (interruptMode != 0)
          reportInterruptAfterWait(interruptMode);
      return deadline - System.nanoTime();
    }

    awaitNanos 方法中,根据传递进来的时间计算超时阻塞 nanosTimeout,然后通过 while 循环中判断nanosTimeout >= spinForTimeoutThreshold 该公式是否成立,当其为 true 时则说明超时时间 nanosTimeout 还未到期,再次计算nanosTimeout = deadline - System.nanoTime();即 nanosTimeout ,持续判断,直到 nanosTimeout 小于 spinForTimeoutThreshold 结束超时阻塞操作,方法也就结束。这里的 spinForTimeoutThreshold 其实更像一个经验值,因为非常短的超时等待无法做到十分精确,因此采用了 spinForTimeoutThreshold 这样一个临界值。offer(E e, long timeout, TimeUnit unit)方法内部正是利用这样的 Codition 的超时等待 awaitNanos 方法实现添加方法的超时阻塞操作。同样对于poll(long timeout, TimeUnit unit)方法也是一样的道理。

  1. 源码分析

    public void start(){
        monitorThread = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // monitor
                while (!toStop) {
                    try {
                        List<Integer> jobLogIdList = new ArrayList<Integer>();
                        int drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);
    
                        if (CollectionUtils.isNotEmpty(jobLogIdList)) {
                            for (Integer jobLogId : jobLogIdList) {
                                if (jobLogId==null || jobLogId==0) {
                                    continue;
                                }
                                XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
                                if (log == null) {
                                    continue;
                                }
                                if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
                                    JobFailMonitorHelper.monitor(jobLogId);
                                    logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
                                } else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {
                                    // job success, pass
                                    logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
                                } else if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
                                           || IJobHandler.FAIL.getCode() == log.getHandleCode()
                                           || IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() ) {
                                    // job fail,
                                    failAlarm(log);
                                    logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
                                } else {
                                    JobFailMonitorHelper.monitor(jobLogId);
                                    logger.info(">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}", jobLogId);
                                }
                            }
                        }
    
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        logger.error("job monitor error:{}", e);
                    }
                }
    
                // monitor all clear
                List<Integer> jobLogIdList = new ArrayList<Integer>();
                int drainToNum = getInstance().queue.drainTo(jobLogIdList);
                if (jobLogIdList!=null && jobLogIdList.size()>0) {
                    for (Integer jobLogId: jobLogIdList) {
                        XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
                        if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
                            // job fail,
                            failAlarm(log);
                            logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId);
                        }
                    }
                }
    
            }
        });
        monitorThread.setDaemon(true);
        monitorThread.start();
    }
  2. 其余就是基本的CRUD基本操作了。

xxl-job-executor

主要功能

  • 执行器服务:暴露一个RPC服务,负责接收调度中心的请求,然后把封装成任务丢到队列中。
  • JobHandler:定义任务处理接口。
  • 任务线程:负责从队列中提取线程,执行,并把执行的结果回调做记录。
  • 注册线程:执行器启动的时候,通过调度中心的注册接口,将自己的name和address注册到调度中心。

具体实现

  • 首先配置扫描 Handler 实体类具体业务实现。
  • 主要配置初始化实体类 xxlJobExecutor

    • XML 配置远程调控中心的配置,以及进行RPC的配置。

参考链接:


文章作者: HoldDie
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 HoldDie !
评论
 上一篇
MySQL-主键生成策略乱炖 MySQL-主键生成策略乱炖
常在河边走,避免不了 MySQL 主键生成策略,故记录一下集中实现方式。 背景 在复杂的分布式系统中,需要对大量的数据和消息进行唯一标识 对于不同的业务系统之间,随着数据增多,对数据库分表后需要哟一个唯一ID标识数据 设计要点 全局唯一性
2018-03-09
下一篇 
Git分支最佳实践 Git分支最佳实践
Git 分支基本操作,完美解决版本变更流程。 译自:A successful Git branching model » nvie.com 本文将展示我一年前在自己的项目中成功运用的开发模型。我一直打算把这些东西写出来,但总是没有抽出时间,
2018-03-07
  目录