Elastic-Job - 分布式定时任务框架

时间:2015-09-14 22:47:42   收藏:0   阅读:25558

Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架。去掉了和dd-job中的监控和ddframe接入规范部分。

ddframe其他模块也有可独立开源的部分,之前当当曾开源过dd-soa的基石模块DubboX。

项目开源地址:https://github.com/dangdangdotcom/elastic-job

Elastic-Job主要功能

代码开发

提供3种作业类型,分别是OneOff, Perpetual和SequencePerpetual。需要继承相应的抽象类。

方法参数shardingContext包含作业配置,分片和运行时信息。可通过getShardingTotalCount(),getShardingItems()等方法分别获取分片总数,运行在本作业服务器的分片序列号集合等。

public class MyElasticJob extends AbstractOneOffElasticJob {

    @Override
    protected void process(JobExecutionMultipleShardingContext context) {
        // do something by sharding items
    }
}
public class MyElasticJob extends AbstractSequencePerpetualElasticJob<Foo> {

    @Override
    protected List<Foo> fetchData(JobExecutionSingleShardingContext context) {
        List<Foo> result = // get data from database by sharding items
        return result;
    }
    
    @Override
    protected boolean processData(JobExecutionSingleShardingContext context, Foo data) {
        // process data
        return true;
    }
}
public class MyElasticJob extends AbstractPerpetualElasticJob<Foo> {

    @Override
    protected List<Foo> fetchData(JobExecutionMultipleShardingContext context) {
        List<Foo> result = // get data from database by sharding items
        return result;
    }
    
    @Override
    protected boolean processData(JobExecutionMultipleShardingContext context, Foo data) {
        // process data
        return true;
    }
}

作业配置

与Spring容器配合使用作业,可以将作业Bean配置为Spring Bean, 可在作业中通过依赖注入使用Spring容器管理的数据源等对象。可用placeholder占位符从属性文件中取值。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" 
    xmlns:job="http://www.dangdang.com/schema/ddframe/job" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    <!--配置作业注册中心 -->
    <reg:zookeeper id="regCenter" serverLists=" yourhost:2181" namespace="dd-job" baseSleepTimeMilliseconds="1000" maxSleepTimeMilliseconds="3000" maxRetries="3" />
    <!-- 配置作业A-->
    <job:bean id="oneOffElasticJob" class="xxx.MyOneOffElasticJob" regCenter="regCenter" cron="0/10 * * * * ?"   shardingTotalCount="3" shardingItemParameters="0=A,1=B,2=C" />
    <!-- 配置作业B-->
    <job:bean id="perpetualElasticJob" class="xxx.MyPerpetualElasticJob" regCenter="regCenter" cron="0/10 * * * * ?" shardingTotalCount="3" shardingItemParameters="0=A,1=B,2=C" processCountIntervalSeconds="10" concurrentDataProcessThreadCount="10" />
</beans>


<job:bean />命名空间属性详细说明

<reg:zookeeper />命名空间属性详细说明


    <!-- 配置作业注册中心 -->
    <bean id="regCenter" class="com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter" init-method="init">
        <constructor-arg>
            <bean class="com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration">
                <property name="serverLists" value="${xxx}" />
                <property name="namespace" value="${xxx}" />
                <property name="baseSleepTimeMilliseconds" value="${xxx}" />
                <property name="maxSleepTimeMilliseconds" value="${xxx}" />
                <property name="maxRetries" value="${xxx}" />
            </bean>
        </constructor-arg>
    </bean>    <!-- 配置作业-->
    <bean id="xxxJob" class="com.dangdang.ddframe.job.spring.schedule.SpringJobController" init-method="init">
        <constructor-arg ref="regCenter" />
        <constructor-arg>
            <bean class="com.dangdang.ddframe.job.api.JobConfiguration">
                <constructor-arg name="jobName" value="xxxJob" />
                <constructor-arg name="jobClass" value="xxxDemoJob" />
                <constructor-arg name="shardingTotalCount" value="10" />
                <constructor-arg name="cron" value="0/10 * * * * ?" />
                <property name="shardingItemParameters" value="${xxx}" />
            </bean>
        </constructor-arg>
    </bean>

如果不使用Spring框架,可以用如下方式启动作业。

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.schedule.JobInitializer;
import com.dangdang.ddframe.job.schedule.JobScheduler;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter;

public class JobDemo {

    // 声明Zookeeper注册中心配置对象
    private final ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("yourhost:2181", "zkRegTestCenter", 1000, 3000, 3);
    
    // 定义Zookeeper注册中心
    private final CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
    
    // 声明作业配置对象
    private final JobConfiguration jobConfig = new JobConfiguration("demoJob", DemoJob.class, 3, "0/1 * * * * ?");
    
    public static void main(final String[] args) {
            new JobDemo().init();
    }
    
    public void init() {
        // 连接注册中心
        regCenter.init();
        // 声明作业启动器
        JobInitializer jobInitializer = new JobInitializer(regCenter, jobConfig);
        // 启动作业,主要是注册作业的信息
        jobInitializer.init();
        // 获取作业调度器实例
        JobScheduler jobScheduler = JobScheduler.getInstance();
        // 添加作业到调度器
        jobScheduler.addJob("demoJob", jobInitializer);
        // 启动所有作业
        jobScheduler.scheduleAllJobs();
    }
}


使用限制

实现原理

技术分享

技术分享


运维平台

原文:http://my.oschina.net/u/719192/blog/506062

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!