tbschedule是通过zookeeper访问地址任务吗

/ tbschedule
项目语言:JAVA
1、搭建zookeeper集群。
1.2、根据说明启动Zookeeper,并测试是否正常启动
2、搭建tbschedule控制台
2.1、从 下载schedule的所有代码和文档
2.2、修改src/test/resources/schedule.xml中的配置信息指向已经启动的zookeeper服务器。为了避免不同应用任务类型间冲突,rootPath尽量全局唯一
&bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.TBScheduleManagerFactory"
init-method="init"&
&property name="zkConfig"&
&entry key="zkConnectString" value="your_ip:2181" /&
&entry key="rootPath" value="/your_app/app1" /&
&entry key="zkSessionTimeout" value="3000" /&
&entry key="userName" value="userName" /&
&entry key="password" value="password" /&
&/property&
2.3、配置Web服务器: 将console\ScheduleConsole.war拷贝到你自己的Web服务器中运行即可.因为没有做仔细的兼容性测试,建议使用IE8
2.4 、启动浏览器 通过Console来检查配置数据是否正确:
第一次运行的时候,会要求你输入zookeeper的相关配置信息
3 如何将自己的应用接入到tbschedule中
3.1 添加zookeeper依赖
&dependency&
&groupId&org.apache.zookeeper&/groupId&
&artifactId&zookeeper&/artifactId&
&version&3.4.6&/version&
&/dependency&
3.2 添加tbschedule依赖
&dependency&
&groupId&com.taobao.pamirs.schedule&/groupId&
&artifactId&tbschedule&/artifactId&
&version&3.2.18&/version&
&/dependency&
3.3 将应用的bean列表加载到tbscheduleManagerFactory
3.3.1 加载zk连接配置
3.3.2 新建ScheduleInitUtil 类,并实现 InitializingBean, ApplicationContextAware
3.3.3 将bean列表加载至tbscheduleManagerFactory
public void afterPropertiesSet() throws Exception {
Properties p = getProperties(configInfo);
tbscheduleManagerFactory = new TBScheduleManagerFactory();
tbscheduleManagerFactory.setApplicationContext(applicationcontext);
tbscheduleManagerFactory.init(p);
tbscheduleManagerFactory.setZkConfig(convert(p));
logger.warn("TBBPM 成功启动schedule调度引擎 ...");
3.4 实现自己应用的任务 示例:
@Component("demoTaskBean")
public class DemoTaskBean
extends IScheduleTaskDealSingle&SubDetailDO&{
/*实现自己的业务查询*/
public List&SubDetailDO& selectTasks(String taskParameter, String ownSign,
int taskItemNum, List&TaskItemDefine& taskItemList,
int eachFetchDataNum) throws Exception {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
int day = calendar.get(Calendar.DAY_OF_MONTH);
List&SubDetailDO& details =
details = subDetailDAO.selectForSchedule(
getScopeByQueueCondition(taskItemNum, taskItemList),
confirmTypes, DETAIL_STATUS_ONE, eachFetchDataNum);
} catch (Exception e) {
log.error(e.getMessage(), e);
/*处理自己的业务*/
public boolean execute(SubDetailDO subDetail, String ownSign)
throws Exception {
yourProcess.process(subDetail);
} catch (Exception e) {
log.error(e.getMessage(), e);
3.5 向zookeeper添加配置调度任务数据,或者通过控制台添加任务和调度策略
---添加调度策略---
3.6、启动调度服务器。如果看到类似日志信息,则表示成功:
[ 16:50:33,098] [DemoTask$PRE-2-exe0] (DemoTaskBean.java:58) INFO
com.taobao.pamirs.schedule.test.DemoTaskBean - 处理任务[PRE]:
[ 16:50:33,098] [DemoTask-0-exe0] (DemoTaskBean.java:58) INFO
com.taobao.pamirs.schedule.test.DemoTaskBean - 处理任务[BASE]:
[ 16:50:33,098] [DemoTask$PRE-3-exe1] (DemoTaskBean.java:58) INFO
com.taobao.pamirs.schedule.test.DemoTaskBean - 处理任务[PRE]:
[ 16:50:33,114] [DemoTask$TEST-4-exe0] (DemoTaskBean.java:58) INFO
com.taobao.pamirs.schedule.test.DemoTaskBean - 处理任务[TEST]:
[ 16:50:33,114] [DemoTask$TEST-5-exe1] (DemoTaskBean.java:58) INFO
com.taobao.pamirs.schedule.test.DemoTaskBean - 处理任务[TEST]:
[ 16:50:33,114] [DemoTask-1-exe1] (DemoTaskBean.java:58) INFO
com.taobao.pamirs.schedule.test.DemoTaskBean - 处理任务[BASE]:
9、在Console中检查服务器运行情况:
11、至此,大功告成。根据自己的需要可以通过Console来维护调度任务和调度策略的配置。
(C)&&2013&&Alibaba&&Inc.&&All&&rights&&resvered.
Powered by4721人阅读
Console Demo地址:
Console下载: trunk/console/ScheduleConsole.war
技术支持: 忌少
&groupId&com.taobao.pamirs.schedule&/groupId&
&artifactId&tbschedule&/artifactId&
&version&3.2.6&/version&
tbschedule
此文档内部包括:
1、设计目标说明
2、主要概念解释
3、涉及的数据对象说明
4、涉及的业务接口说明
5、Sleep模式和NotSleep模式的区别
6、使用过程中的注意事项
1、调度器的设计目标
1、tbschedule的目的是让一种批量任务或者不断变化的任务,能够被动态的分配到多个主机的JVM中,不同的线程组中并行执行。所有的任务能够被不重复,不遗漏的快速处理。
2、调度的Manager可以动态的随意增加和停止
3、可以通过JMX控制调度服务的创建和停止
4、可以指定调度的时间区间:
PERMIT_RUN_START_TIME :允许执行时段的开始时间crontab的时间格式.以startrun:开始,则表示开机立即启动调度
PERMIT_RUN_END_TIME :允许执行时段的结束时间crontab的时间格式,如果不设置,表示取不到数据就停止
PERMIT_RUN_START_TIME ='0 * * * * ?' 表示在每分钟的0秒开始
PERMIT_RUN_START_TIME ='20 * * * * ?' 表示在每分钟的20秒终止
就是每分钟的0-20秒执行,其它时间休眠
格式信息请参照:
2、主要概念
TaskType任务类型:
是任务调度分配处理的单位,例如:
1、将一张表中的所有状态为STS=’N’的所有数据提取出来发送给其它系统,同时将修改状态STS=’Y’,就是一种任务。TaskType=’DataDeal’
2、将一个目录以所有子目录下的所有文件读取出来入库,同时把文件移到对应的备份目录中,也是一种任务。TaskType=’FileDeal’。
3、可以为一个任务类型自定义一个字符串参数由应用自己解析。例如:&AREA=杭州,YEAR&30&
ScheduleServer任务处理器
1、是由一组线程【1..n个线程】构成的任务处理单元,每一任务处理器有一个唯一的全局标识,
一般以IP$UUID[例如192.168.1.100$0C78F0C0FA084E54BFA73DC]的形式出现。 一个任务类型的数据可以由1..n个任务处理器同时进行。
2、这些任务处理器可以在同一个JVM中,也可以分布在不同主机的JVM中。任务处理器内部有一个心跳线程,用于确定Server的状态和任务的动态分配,
有一组工作线程,负责处理查询任务和具体的任务处理工作。
3、目前版本所有的心跳信息都是存放在Zookeeper服务器中的,所有的Server都是对等的,当一个Server死亡后,其它Server会接管起拥有的任务队列,
期间会有几个心跳周期的时延。后续可以用类似ConfigerServer类的存储。
4、现有的工作线程模式分为Sleep模式和NotSleep模式。缺省是缺省是NOTSLEEP模式。在通常模式下,在通常情况下用Sleep模式。
在一些特殊情况需要用NotSleep模式。两者之间的差异在后续进行描述。
TaskItem任务项
是对任务进行的分片划分。例如:
1、将一个数据表中所有数据的ID按10取模,就将数据划分成了0、1、2、3、4、5、6、7、8、9供10个任务项。
2、将一个目录下的所有文件按文件名称的首字母(不区分大小写),
就划分成了A、B、C、D、E、F、G、H、I、J、K、L、M、N、O、P、Q、R、S、T、U、V、W、X、Y、Z供26个队列。
3、将一个数据表的数据ID哈希后按1000取模作为最后的HASHCODE,我们就可以将数据按[0,100)、[100,200) 、[200,300)、[300,400) 、
[400,500)、[500,600)、[600,700)、[700,800)、[800,900)、 [900,1000)划分为十个任务项,当然你也可以划分为100个任务项,最多是1000个任务项。
任务项是进行任务分配的最小单位。一个任务项只能由一个ScheduleServer来进行处理。但一个Server可以处理任意数量的任务项。
例如任务被划分为了10个队列,可以只启动一个Server,所有的任务项都有这一个Server来处理;也可以启动两个Server,每个Sever处理5个任务项;
但最多只能启动10个Server,每一个ScheduleServer只处理一个任务项。如果在多,则第11个及之后的Server将不起作用,处于休眠状态。
4、可以为一个任务项自定义一个字符串参数由应用自己解析。例如:&TYPE=A,KIND=1&
TaskDealBean任务处理类
是业务系统进行数据处理的实现类。要求实现Schedule的接口IScheduleTaskDealMulti或者IScheduleTaskDealSingle。
接口主要包括两个方法。一个是根据调度器分配到的队列查询数据的接口,一个是进行数据处理的接口。
1、可以指定任务处理的时间间隔,例如每天的1:00-3:00执行,或者每个月的第一天执行、每一个小时的第一分钟执行等等。
间格式与crontab相同。如果不指定就表示一致运行。PERMIT_RUN_START_TIME,PERMIT_RUN_END_TIME
2、可以指定如果没有数据了,休眠的时间间隔。SLEEP_TIME_NODATA 单位秒
3、可以指定每处理完一批数据后休眠的时间间隔.SLEEP_TIME_INTERVAL 单位
OwnSign环境区域
是对运行环境的划分,进行调度任务和数据隔离。例如:开发环境、测试环境、预发环境、生产环境。
不同的开发人员需要进行数据隔离也可以用OwnSign来实现,避免不同人员的数据冲突。缺省配置的环境区域OwnSign='BASE'。
例如:TaskType='DataDeal',配置的队列是0、1、2、3、4、5、6、7、8、9。缺省的OwnSign='BASE'。
此时如果再启动一个测试环境,则Schedule会动态生成一个TaskType='DataDeal-Test'的任务类型,环境会作为一个变量传递给业务接口,
由业务接口的实现类,在读取数据和处理数据的时候进行确定。业务系统一种典型的做法就是在数据表中增加一个OWN_SIGN字段。
在创建数据的时候根据运行环境填入对应的环境名称,在Schedule中就可以环境的区分了。
是指某一个任务在调度集群上的分布策略,可以制定:
1、可以指定任务的机器IP列表。127.0.0.1和localhost表示所有机器上都可以执行
2、可以指定每个机器上能启动的线程组数量,0表示没有限制
3、可以指定所有机器上运行的线程组总数。
3、业务接口说明
包含三个业务接口,:
1、IScheduleTaskDeal 调度器对外的基础接口,是一个基类,并不能被直接使用
2、IScheduleTaskDealSingle 单任务处理的接口,继承 IScheduleTaskDeal
3、IScheduleTaskDealMulti 可批处理的任务接口,继承 IScheduleTaskDeal
IScheduleTaskDeal 调度器对外的基础接口
public interface IScheduleTaskDeal&T& {
* 根据条件,查询当前调度服务器可处理的任务
* @param taskParameter 任务的自定义参数
* @param ownSign 当前环境名称
* @param taskQueueNum 当前任务类型的任务队列数量
* @param taskQueueList 当前调度服务器,分配到的可处理队列
* @param eachFetchDataNum 每次获取数据的数量
* @throws Exception
public List&T& selectTasks(String taskParameter,String ownSign,int taskQueueNum,List&TaskItemDefine& taskItemList,int eachFetchDataNum) throws E
* 获取任务的比较器,只有在NotSleep模式下需要用到
public Comparator&T& getComparator();
IScheduleTaskDealSingle 单任务处理的接口
public interface IScheduleTaskDealSingle&T& extends IScheduleTaskDeal&T& {
* 执行单个任务
* @param task Object
* @param ownSign 当前环境名称
* @throws Exception
public boolean execute(T task,String ownSign) throws E
IScheduleTaskDealMulti 可批处理的任务接口
public interface IScheduleTaskDealMulti&T&
extends IScheduleTaskDeal&T& {
执行给定的任务数组。因为泛型不支持new 数组,只能传递OBJECT[]
* @param tasks 任务数组
* @param ownSign 当前环境名称
* @throws Exception
public boolean execute(Object[] tasks,String ownSign) throws E
4、Sleep模式和NotSleep模式的区别
1、ScheduleServer启动的工作线程组线程是共享一个任务池的。
2、在Sleep的工作模式:当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。如果是,则自己休眠;
如果其它线程都已经因为没有任务进入休眠,当前线程是最后一个活动线程的时候,就调用业务接口,获取需要处理的任务,放入任务池中,
同时唤醒其它休眠线程开始工作。
3、在NotSleep的工作模式:当一个线程任务处理完毕,从任务池中取不到任务的时候,立即调用业务接口获取需要处理的任务,放入任务池中。
4、Sleep模式在实现逻辑上相对简单清晰,但存在一个大任务处理时间长,导致其它线程不工作的情况。
5、在NotSleep模式下,减少了线程休眠的时间,避免大任务阻塞的情况,但为了避免数据被重复处理,增加了CPU在数据比较上的开销。
同时要求业务接口实现对象的比较接口。
6、如果对任务处理不允许停顿的情况下建议用NotSleep模式,其它情况建议用sleep模式。
5、使用过程中的注意事项(待续)
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:278489次
积分:4139
积分:4139
排名:第5826名
原创:145篇
转载:66篇
评论:29条
(2)(2)(1)(1)(1)(1)(1)(3)(4)(6)(3)(15)(4)(4)(1)(2)(6)(14)(6)(2)(14)(1)(3)(3)(1)(2)(4)(4)(3)(7)(6)(9)(6)(16)(16)(17)(8)(13)关于TbSchedule任务调度管理框架的整合部署_架构设计_ThinkSAAS
关于TbSchedule任务调度管理框架的整合部署
关于TbSchedule任务调度管理框架的整合部署
一、前言
任务调度管理作为基础架构通常会出现于我们的业务系统中,目的是让各种任务能够按计划有序执行。比如定时给用户发送邮件、将数据表中的数据同步到另一个数据表都是一个任务,这些相对耗时的操作通过任务调度系统来异步并行执行,既能提高任务的执行效率又能保障任务执行的可靠性。
实现的方式也是多种多样,比如使用Timer进行简单调度或者使用Quartz类似的框架,本文基于淘宝开源框架TbSchedule实现,其设计目的是让批量任务或者不断变化的任务能够被动态的分配到多个主机的JVM中,在不同的线程组中并行执行,所有的任务能够被不重复,不遗漏的快速处理,目前被应用于阿里巴巴众多业务系统。
请参照,相关内容不再重复介绍,本文记录了详细的部署整合操作步骤。
二、Zookeeper部署
1、TbSchedule依赖于Hadoop Zookeeper组件,实现任务的分布式配置及各服务间的交互通信,Zookeeper以TreeNode类型进行存储,支持Cluster形式部署且保证最终数据一致性,关于ZK的资料网上比较丰富,相关概念不再重复介绍,本文以zookeeper-3.4.6为例,请从官网下载。
2、创建ZookeeperLab文件夹目录,模拟部署3台Zookeeper服务器集群,目录结构如下。
3、解压从官网下载的zookeeper-3.4.6.tar文件,并分别复制到三台ZkServer的zookeeper-3.4.6文件夹。
4、分别在三台ZkServer的data目录下创建myid文件(注意没有后缀),用于标识每台Server的ID,在Server1datamyid文件中保存单个数字1,Server2的myid文件保存2,Server3的myid保存3。
5、创建ZkServer的配置文件,在zookeeper-3.4.6conf文件夹目录下创建zoo.cfg,可以从示例的zoo_sample.cfg 复制重命名。因为在同一台机器模拟Cluster部署,端口号不能重复,配置文件中已经有详细的解释,修改后的配置如下,其中Server1端口号2181,Server2端口号2182,Server3端口号2183。
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=E:/ZookeeperLab/server1/data
dataLogDir=E:/ZookeeperLab/server1/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=127.0.0.1:
server.2=127.0.0.1:
server.3=127.0.0.1:
View Code
6、通过zookeeper-3.4.6bin文件夹zkServer.bat文件启动ZKServer,由于Cluster部署需要选举Leader和Followers,所以在3个ZKServer全部启动之前会提示一个WARN,属正常现象。
7、Zookeeper启动成功后可以通过zookeeper-3.4.6bin文件夹的 zkCli.bat验证连接是否正常,比如创建节点“create /testnode helloworld”,查看节点“get /testnode”,连接到组群中其它ZkServer,节点数据应该是一致的。更多指令请使用help命令查看。
8、对于Linux环境下部署基本一致,zoo.cfg配置文件中data和datalog文件夹路径改为linux格式路径,使用“./zkServer.sh start-foreground”命令启动ZkServer,注意start启动参数不能输出异常信息。
9、至此Zookeeper的配置完毕。
三、SVN从TaoCode获取并Import TbSchedule源码(请先配置Maven环境)
四、TbSchedule控制台的部署
1、TbSchedule Console是基于web页面对调度任务配置、部署、监控的终端。
2、将源码目录下的consoleScheduleConsole.war包及depend-lib库部署到Web应用服务器,本文以Tomcat 7.0.54为例,相关步骤不再描述。
3、首次打开页面会跳转到“基础信息配置”Config.jsp页面,其中前2项为Zookeeper服务的连接配置,请正确填写前面部署的Zookeeper服务器地址和端口号,多个ZKServer可以使用逗号分隔。后3项是用于标识ZkServer中调度任务配置的根节点和节点权限,无严格要求。
4、点击保存后,会提示 “错误信息:Zookeeper connecting ......localhost:2181”,如果ZKServer配置正确可以不用理会,直接点击“管理主页”,若不能正常跳转到Index.jsp页面请重新检查Zookeeper的配置,建议关闭防火墙。
5、TbSchedule Console Web站点对应的两个地址
[监控页面]
[管理页面]
如果以上地址能正常访问则TbSchedule Console的部署配置完成。
五、Task场景设计
1、假设场景:任务需要将订单表tbOrder中制单日期在141208內(共8天)的数据同步到备份tbOrder_copy 表,其中每2天分为一个任务组并行同步(每次提取500条),关于任务组的划分和TbSchedule中TaskItem的相关概念请先参考wiki,后续也会有部分解释。
2、数据环境使用MySql数据库,创建tbOrder和tbOrder_Copy数据表,结构相同,同时在tbOrder事先生成好测试数据,建议每天的数据量在1000条以上。
六、数据同步任务实现
1、创建TaskCenter Demo Project,添加对tbschedule project的依赖,并集成Spring Framework。
2、创建OrderInfo实体类,属性对应tbOrder表,用于映射从数据表取的数据。
3、创建DataSyncABean任务类,实现IScheduleTaskDealSingle&T&泛型接口的selectTasks和execute方法,其中selectTasks方法用于取数,execute方法用于执行selectTasks返回的Result,关于代码中任务片段的划分和TbSchedule中TaskItem的相关概念后续再解释,代码参考如下。
1 package T
3 import java.sql.ResultS
4 import java.util.ArrayL
5 import
6 import java.util.L
8 import com.taobao.pamirs.schedule.IScheduleTaskDealS
9 import com.taobao.pamirs.schedule.TaskItemD
11 import DBHelper.*;
13 public class DataSyncABean implements IScheduleTaskDealSingle&OrderInfo& {
public List&OrderInfo& selectTasks(String taskParameter, String ownSign,
int taskItemNum, List&TaskItemDefine& queryCondition,
int eachFetchDataNum) throws Exception {
List&OrderInfo& result = new ArrayList&OrderInfo&();
if (queryCondition.size() == 0) {
return
StringBuffer condition = new StringBuffer();
for (int i = 0; i & queryCondition.size(); i++) {
if (i & 0) {
condition.append(",");
condition.append(queryCondition.get(i).getTaskItemId());
/* 场景A:将tbOrder表中的数据分8个任务项,每次取200条数据, 同步到tbOrder_copy表中。 */
String sql = "select * from tbOrder " + "where "
+ " BillNumber not in (select BillNumber from tbOrder_copy) "
+ " and RIGHT(BuildDate,1) in (" + condition + ") " + "limit "
+ eachFetchDataN
System.out.println("开始执行SQL:" + sql);
ResultSet rs = MySQLHelper.executeQuery(sql);
while (rs.next()) {
OrderInfo order = new OrderInfo();
order.BillNumber = rs.getString("BillNumber");
order.BuildDate = rs.getString("BuildDate");
order.Customer = rs.getString("Customer");
order.GoodsName = rs.getString("GoodsName");
order.Amount = rs.getFloat("Amount");
order.SaleMoney = rs.getFloat("SaleMoney");
result.add(order);
if (rs.isLast()) {
break;
MySQLHelper.free(rs, rs.getStatement(), rs.getStatement()
.getConnection());
return
public Comparator&OrderInfo& getComparator() {
return null;
public boolean execute(OrderInfo task, String ownSign) throws Exception {
String sql = "insert into tbOrder_copy values(&" + task.BillNumber
+ "&,&" + task.BuildDate + "&,&" + task.Customer + "&,&"
+ task.GoodsName + "&," + task.Amount + "," + task.SaleMoney
+ ")";
MySQLHelper.executeNonQuery(sql);
System.out.println("execute:" + sql);
return true;
View Code
4、在Spring容器中注册数据同步任务Bean。
1 &?xml version="1.0" encoding="UTF-8"?&
2 &beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"&
&bean id="dataSyncABean" class="Task.DataSyncABean" /&
9 &/beans&
View Code
5、Main函数初始化Spring容器和TbSchedule 任务管理Factory,连接ZKServer,代码如下,也可以参照TbSchedule源码中通过Spring进行初始化TBScheduleManagerFactory 。
1 import java.util.P
3 import org.springframework.context.ApplicationC
4 import org.springframework.context.support.FileSystemXmlApplicationC
7 import com.taobao.pamirs.schedule.strategy.TBScheduleManagerF
9 public class TaskCenter {
public static void main(String[] args) throws Exception {
// 初始化Spring
ApplicationContext ctx = new FileSystemXmlApplicationContext(
"src\applicationContext.xml");
// 初始化调度工厂
TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();
Properties p = new Properties();
p.put("zkConnectString", "localhost:2181");
p.put("rootPath", "/tbSchedule/Test");
p.put("zkSessionTimeout", "60000");
p.put("userName", "zookeeper");
p.put("password", "zookeeper");
p.put("isCheckParentPath", "true");
scheduleManagerFactory.setApplicationContext(ctx);
scheduleManagerFactory.init(p);
View Code
6、如果配置正确应该可以成功启动该TaskDeal服务。
七、在TbSchedule Console创建调度任务(请事先仔细阅读wiki中的概念解释)
1、创建调度策略,其中“最大线程组数量”设置为4,表示在机器上的通过4个线程组并行执行数据同步任务。
2、创建调度任务
任务名称:对应调度策略中的任务名称,标识任务和策略的关联关系;
任务处理的SpringBean:对应Demo TaskDeal服务Spring容器中的任务对象ID;
1 &bean id="dataSyncABean" class="Task.DataSyncABean" /&
View Code
每次获取数据量:对应于bean任务类selectTasks方法参数 eachFetchDataNum;
执行开始时间:“0 * * * * ?” 表示每分钟的0秒开始,表达式同Quartz设置的Crontab格式,有工具可以生成,详细解释参照;
2.字段 允许值 允许的特殊字符
3.秒 0-59 , - * /
4.分 0-59 , - * /
5.小时 0-23 , - * /
6.日期 1-31 , - * ? / L W C
7.月份 1-12 或者 JAN-DEC , - * /
8.星期 1-7 或者 SUN-SAT , - * ? / L C #
9.年(可选) 留空,
10.表达式意义
11."0 0 12 * * ?" 每天中午12点触发
12."0 15 10 ? * *" 每天上午10:15触发
13."0 15 10 * * ?" 每天上午10:15触发
14."0 15 10 * * ? *" 每天上午10:15触发
15."0 15 10 * * ? 2005" 2005年的每天上午10:15触发
16."0 * 14 * * ?" 在每天下午2点到下午2:59期间的每1分钟触发
17."0 0/5 14 * * ?" 在每天下午2点到下午2:55期间的每5分钟触发
18."0 0/5 14,18 * * ?" 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
19."0 0-5 14 * * ?" 在每天下午2点到下午2:05期间的每1分钟触发
20."0 10,44 14 ? 3 WED" 每年三月的星期三的下午2:10和2:44触发
21."0 15 10 ? * MON-FRI" 周一至周五的上午10:15触发
22."0 15 10 15 * ?" 每月15日上午10:15触发
23."0 15 10 L * ?" 每月最后一日的上午10:15触发
24."0 15 10 ? * 6L" 每月的最后一个星期五上午10:15触发
25."0 15 10 ? * 6L " 2002年至2005年的每月的最后一个星期五上午10:15触发
26."0 15 10 ? * 6#3" 每月的第三个星期五上午10:15触发
27.每天早上6点
28.0 6 * * *
29.每两个小时
30.0 */2 * * *
31.晚上11点到早上8点之间每两个小时,早上八点
32.0 23-7/2,8 * * *
33.每个月的4号和每个礼拜的礼拜一到礼拜三的早上11点
34.0 11 4 * 1-3
35.1月1日早上4点
36.0 4 1 1 *
View Code
任务项:前面假设的任务场景中需要把1-8号数据按2天为1个任务组并行数据同步,所以可以把任务划分为1,2,3,4,5,6,7,8一共8个任务碎片,8个任务碎片被分配到4个线程组,那么每个线程组对应2个任务碎片,运行时任务项参数又被传递到bean任务类selectTasks方法的List&TaskItemDefine& queryCondition参数,例如第1个线程组调用selectTasks方法是queryCondition参数条件为1,2 ,第2个线程组执行参数条件为3,4,bean任务类中根据参数生成对应的BuildDate条件取数,并将结果提交到execute方法执行,从而实现并行计算。
任务项的划分:可以有非常多的分配策略和技巧,例如将一个数据表中所有数据的ID按10取模,就将数据划分成了0、1、2、3、4、5、6、7、8、9供10个任务项;将一个目录下的所有文件按文件名称的首字母(不区分大小写),就划分成了A、B、C、D、E、F、G、H、I、J、K、L、M、N、O、P、Q、R、S、T、U、V、W、X、Y、Z供26个任务项 。
3、新增任务配置保存后,Task会按指定的时间段自动开始执行,可以从TbScheduleConsole监视到对应的线程组和任务项。
4、同时可以从任务TaskDeal服务控制台监视到Debug输出的取数SQL和 Insert语句,以及TbSchedule的Heartbeat、TaskItem Debug信息。
1 17:01:00.000 [DataSyncATask-4-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManager - 恢复调度:DataSyncATask$192.168.56.1$44AAB33F650B6FF7E532$
2 17:01:00.000 [DataSyncATask-2-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManager - 恢复调度:DataSyncATask$192.168.56.1$FCCFD9CE84DB$
3 17:01:00.000 [DataSyncATask-3-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManager - 恢复调度:DataSyncATask$192.168.56.1$D4D821EBFCFA47C78F45A7$
4 17:01:00.002 [DataSyncATask-0-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManager - 恢复调度:DataSyncATask$192.168.56.1$D3CEDB8C9220FFCD9B584$
5 开始执行SQL:select * from tbOrder where
BillNumber not in (select BillNumber from tbOrder_copy)
and RIGHT(BuildDate,1) in (7,8) limit 500
6 开始执行SQL:select * from tbOrder where
BillNumber not in (select BillNumber from tbOrder_copy)
and RIGHT(BuildDate,1) in (5,6) limit 500
7 开始执行SQL:select * from tbOrder where
BillNumber not in (select BillNumber from tbOrder_copy)
and RIGHT(BuildDate,1) in (3,4) limit 500
8 开始执行SQL:select * from tbOrder where
BillNumber not in (select BillNumber from tbOrder_copy)
and RIGHT(BuildDate,1) in (1,2) limit 500
9 17:01:00.289 [DataSyncATask-4-exe3] DEBUG c.t.p.s.t.TBScheduleManager - 暂停调度 :DataSyncATask$192.168.56.1$44AAB33F650B6FF7E532$:FetchDataCount=147,FetchDataNum=9600,DealDataSucess=9600,DealDataFail=0,DealSpendTime=687968,otherCompareCount=0
10 17:01:00.291 [DataSyncATask-0-exe1] DEBUG c.t.p.s.t.TBScheduleManager - 暂停调度 :DataSyncATask$192.168.56.1$D3CEDB8C9220FFCD9B584$:FetchDataCount=130,FetchDataNum=180,DealDataSucess=180,DealDataFail=0,DealSpendTime=23783,otherCompareCount=0
11 17:01:00.295 [DataSyncATask-3-exe0] DEBUG c.t.p.s.t.TBScheduleManager - 暂停调度 :DataSyncATask$192.168.56.1$D4D821EBFCFA47C78F45A7$:FetchDataCount=161,FetchDataNum=18000,DealDataSucess=18000,DealDataFail=0,DealSpendTime=1207563,otherCompareCount=0
12 17:01:00.297 [DataSyncATask-2-exe0] DEBUG c.t.p.s.t.TBScheduleManager - 暂停调度 :DataSyncATask$192.168.56.1$FCCFD9CE84DB$:FetchDataCount=131,FetchDataNum=700,DealDataSucess=700,DealDataFail=0,DealSpendTime=80063,otherCompareCount=0
13 17:01:02.674 [DataSyncATask-2-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManagerStatic - DataSyncATask$192.168.56.1$FCCFD9CE84DB$:不是负责任务分配的Leader,直接返回
14 17:01:02.865 [DataSyncATask-3-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManagerStatic - DataSyncATask$192.168.56.1$D4D821EBFCFA47C78F45A7$:不是负责任务分配的Leader,直接返回
15 17:01:02.866 [DataSyncATask-4-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManagerStatic - DataSyncATask$192.168.56.1$44AAB33F650B6FF7E532$:不是负责任务分配的Leader,直接返回
16 17:01:04.433 [DataSyncATask-0-HeartBeat] DEBUG c.t.p.s.zk.ScheduleDataManager4ZK - DataSyncATask$192.168.56.1$D3CEDB8C9220FFCD9B584$:开始重新分配任务......
17 17:01:04.437 [DataSyncATask-0-HeartBeat] DEBUG c.t.p.s.zk.ScheduleDataManager4ZK - 
18 TASK_TYPE=DataSyncATask:TASK_ITEM=1:CUR_SERVER=DataSyncATask$192.168.56.1$D3CEDB8C9220FFCD9B584$:REQ_SERVER=null:DEAL_PARAMETER=
19 TASK_TYPE=DataSyncATask:TASK_ITEM=2:CUR_SERVER=DataSyncATask$192.168.56.1$D3CEDB8C9220FFCD9B584$:REQ_SERVER=null:DEAL_PARAMETER=
20 TASK_TYPE=DataSyncATask:TASK_ITEM=3:CUR_SERVER=DataSyncATask$192.168.56.1$FCCFD9CE84DB$:REQ_SERVER=null:DEAL_PARAMETER=
21 TASK_TYPE=DataSyncATask:TASK_ITEM=4:CUR_SERVER=DataSyncATask$192.168.56.1$FCCFD9CE84DB$:REQ_SERVER=null:DEAL_PARAMETER=
22 TASK_TYPE=DataSyncATask:TASK_ITEM=5:CUR_SERVER=DataSyncATask$192.168.56.1$D4D821EBFCFA47C78F45A7$:REQ_SERVER=null:DEAL_PARAMETER=
23 TASK_TYPE=DataSyncATask:TASK_ITEM=6:CUR_SERVER=DataSyncATask$192.168.56.1$D4D821EBFCFA47C78F45A7$:REQ_SERVER=null:DEAL_PARAMETER=
24 TASK_TYPE=DataSyncATask:TASK_ITEM=7:CUR_SERVER=DataSyncATask$192.168.56.1$44AAB33F650B6FF7E532$:REQ_SERVER=null:DEAL_PARAMETER=
25 TASK_TYPE=DataSyncATask:TASK_ITEM=8:CUR_SERVER=DataSyncATask$192.168.56.1$44AAB33F650B6FF7E532$:REQ_SERVER=null:DEAL_PARAMETER=
26 17:01:07.658 [DataSyncATask-2-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManagerStatic - DataSyncATask$192.168.56.1$FCCFD9CE84DB$:不是负责任务分配的Leader,直接返回
27 17:01:07.882 [DataSyncATask-3-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManagerStatic - DataSyncATask$192.168.56.1$D4D821EBFCFA47C78F45A7$:不是负责任务分配的Leader,直接返回
28 17:01:07.882 [DataSyncATask-4-HeartBeat] DEBUG c.t.p.s.t.TBScheduleManagerStatic - DataSyncATask$192.168.56.1$44AAB33F650B6FF7E532$:不是负责任务分配的Leader,直接返回
29 17:01:09.441 [DataSyncATask-0-HeartBeat] DEBUG c.t.p.s.zk.ScheduleDataManager4ZK - DataSyncATask$192.168.56.1$D3CEDB8C9220FFCD9B584$:开始重新分配任务......
30 17:01:09.446 [DataSyncATask-0-HeartBeat] DEBUG c.t.p.s.zk.ScheduleDataManager4ZK - 
31 TASK_TYPE=DataSyncATask:TASK_ITEM=1:CUR_SERVER=DataSyncATask$192.168.56.1$D3CEDB8C9220FFCD9B584$:REQ_SERVER=null:DEAL_PARAMETER=
32 TASK_TYPE=DataSyncATask:TASK_ITEM=2:CUR_SERVER=DataSyncATask$192.168.56.1$D3CEDB8C9220FFCD9B584$:REQ_SERVER=null:DEAL_PARAMETER=
33 TASK_TYPE=DataSyncATask:TASK_ITEM=3:CUR_SERVER=DataSyncATask$192.168.56.1$FCCFD9CE84DB$:REQ_SERVER=null:DEAL_PARAMETER=
34 TASK_TYPE=DataSyncATask:TASK_ITEM=4:CUR_SERVER=DataSyncATask$192.168.56.1$FCCFD9CE84DB$:REQ_SERVER=null:DEAL_PARAMETER=
35 TASK_TYPE=DataSyncATask:TASK_ITEM=5:CUR_SERVER=DataSyncATask$192.168.56.1$D4D821EBFCFA47C78F45A7$:REQ_SERVER=null:DEAL_PARAMETER=
36 TASK_TYPE=DataSyncATask:TASK_ITEM=6:CUR_SERVER=DataSyncATask$192.168.56.1$D4D821EBFCFA47C78F45A7$:REQ_SERVER=null:DEAL_PARAMETER=
37 TASK_TYPE=DataSyncATask:TASK_ITEM=7:CUR_SERVER=DataSyncATask$192.168.56.1$44AAB33F650B6FF7E532$:REQ_SERVER=null:DEAL_PARAMETER=
38 TASK_TYPE=DataSyncATask:TASK_ITEM=8:CUR_SERVER=DataSyncATask$192.168.56.1$44AAB33F650B6FF7E532$:REQ_SERVER=null:DEAL_PARAMETER=
View Code
5、至此同步任务配置完成,并且实现了模拟场景的要求。
八、以上也只是对TbSchedule的初步认识,更多高级应用仍然在探索中,欢迎交流。
九、向开源工作者和组织致敬,@xuannan
@kongxuan,感谢对开源事业作出的任何贡献。
PHP开发框架
开发工具/编程工具
服务器环境
ThinkSAAS商业授权:
ThinkSAAS为用户提供有偿个性定制开发服务
ThinkSAAS将为商业授权用户提供二次开发指导和技术支持
让ThinkSAAS更好,把建议拿来。
开发客服微信

我要回帖

更多关于 zookeeper 外网访问 的文章

 

随机推荐