1. Azkaban组件
1.1. 介绍
1.1.1. Azkaban简介
Azkaban是一种类似于Oozie的工作流控制引擎,可以用来解决多个Hadoop(或Spark等)离线计算任务之间的依赖关系问题。也可以用其代替crontab来对周期性任务进行调度,并且更为直观,可靠,同时提供了美观的可视化管理界面。
1.1.2. Azkaban功能及特点
● 兼容任何版本的hadoop
● 易于使用的Web用户界面
● 简单的工作流的上传
● 方便设置任务之间的关系
● 调度工作流
● 模块化和可插拔的插件机制
● 认证/授权(权限的工作)
● 能够杀死并重新启动工作流
● 有关失败和成功的电子邮件提醒
1.2. 架构
Azkaban由三部分构成:
1.2.1. Relational Database(Mysql)
Azkaban将大多数状态信息都存于MySQL中,Azkaban Web Server 和 Azkaban Executor Server也需要访问DB。
1.2.2. Azkaban Web Server
使用Jetty作为Web服务器,用作控制器以及提供Web界面。是azkaban的主要管理者,包括 project 的管理,认证,调度,对工作流执行过程的监控等。
1.2.3. Azkaban Executor Server
调度工作流和任务,记录工作流活任务的日志,之所以将AzkabanWebServer和AzkabanExecutorServer分开,主要是因为在某个任务流失败后,可以更方便的将重新执行。
1.3. 运行模式
1.3.1. solo server mode
web server 和 executor server运行在一个进程里。
最简单的模式,数据库内置的H2数据库,管理服务器和执行服务器都在一个进程中运行,任务量不大项目可以采用此模式。
1.3.2. two server mode
web server 和 executor server运行在不同的进程。
数据库为mysql,管理服务器和执行服务器在不同进程,这种模式下,管理服务器和执行服务器互不影响。
1.3.3. multiple executor mode
web server 和 executor server运行在不同的进程,executor server有多个。
该模式下,执行服务器和管理服务器在不同主机上,且执行服务器可以有多个。
1.4. 组成元素
Azkaban界面中的主要元素有三个,分别是project、job与flow。
project可以理解为某个项目,其项目中包含了许多需要执行的任务,即为job,各个job之间形成依赖关系,便组成了工作流flow。
在Azkaban系统的web界面中有创建project的交互,可以通过界面创建一个project,但是Azkaban没有创建job与flow的界面,这一点很讨厌。于是需要编写以.job为扩展名的文件然后上传,才能在系统中形成job任务。
2. 安装部署
新版本,需要下载源码后,自己进行编译。
旧版本可直接下载对应的tar.gz文件。
2.1.下载源码
下载地址:http://azkaban.github.io/downloads.html
2.2.编译
编译命令如下,官网(https://azkaban.readthedocs.io/en/latest/getStarted.html#building-from-source)。
过程中需要安装g++等。
# Build Azkaban
./gradlew build
# Clean the build
./gradlew clean
# Build and install distributions
./gradlew installDist
# Run tests
./gradlew test
# Build without running tests
./gradlew build -x test
2.3.生成tar.gz包。
编译后的生成结果。
需要用到的文件:
● azkaban-db-0.1.0-SNAPSHOT.tar.gz
● azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz
● azkaban-web-server-0.1.0-SNAPSHOT.tar.gz
2.4. 部署及配置
采用第二种运行模式,管理服务器、执行服务器分进程,但在同一台主机上。
2.4.1. azkaban-exec-server配置
vi /data/install/azkaban/azkaban-exec-server/conf/azkaban.properties
...
default.timezone.id=Asia/Shanghai
...
# Azkaban mysql settings by default. Users should configure their own username and password.
database.type=mysql
mysql.port=3306
mysql.host=10.16.0.92
mysql.database=azkaban
mysql.user=hadoop
mysql.password=iWxep5XH#C
mysql.numconnections=100
2.4.2. azkaban-web-server配置
web服务配置
vi /data/install/azkaban/azkaban-web-server/conf/azkaban.properties
...
default.timezone.id=Asia/Shanghai
...
# Azkaban mysql settings by default. Users should configure their own username and password.
database.type=mysql
mysql.port=3306
mysql.host=10.16.0.92
mysql.database=azkaban
mysql.user=hadoop
mysql.password=iWxep5XH#C
mysql.numconnections=100
3. 例子
3.1.Flow 1.0
Flow 1.0 will be deprecated in the future.
通过xxx.job文件的方式,来配置job。多个job时,分开多个文件,配置上不友好。
1、创建文件test_properties.job
# test_properties.job
type=command
command=sh /data/install/st_sh/azkaban_test/test_properties.sh
2、打包成test_properties.zip文件。
3、通过页面,创建project,然后upload。
3.2.Flow 2.0
1、 创建文件flow20.project,用于版本声明。
azkaban-flow-version: 2.0
2、 创建文件basic.flow,用于任务流配置。
nodes:
- name: jobC
type: noop
# jobC depends on jobA and jobB
dependsOn:
- jobA
- jobB
- name: jobA
type: command
config:
command: echo "This is an echoed text."
- name: jobB
type: command
config:
command: pwd
3、 通过页面,创建project,然后upload。
3.3.Flow 2.0 配置语言
配置时,使用YAML语言。语法要求:
● 大小写敏感
● 使用缩进表示层级关系
● 缩进不允许使用tab,只允许空格
● 缩进的空格数不重要,只要相同层级的元素左对齐即可
● '#'表示注释
3.4.复杂些的例子
1、配置代码:
nodes:
- name: jobD1
type: command
dependsOn:
- jobC1
- jobC2
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_d1.sh
- name: jobC1
type: command
dependsOn:
- jobB1
- jobB2
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_c1.sh
- name: jobC2
type: command
dependsOn:
- jobB3
- jobB4
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_c2.sh
- name: jobB1
type: command
dependsOn:
- jobA1
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_b1.sh
- name: jobB2
type: command
dependsOn:
- jobA1
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_b2.sh
- name: jobB3
type: command
dependsOn:
- jobA2
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_b3.sh
- name: jobB4
type: command
dependsOn:
- jobA2
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_b4.sh
- name: jobA1
type: command
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_a1.sh
- name: jobA2
type: command
config:
command: sh /data/install/st_sh/azkaban_test/test_dependencies_a2.sh
4、 图
3.5.应用流程
1、 编码文件、打包成zip包(不要对文件夹打包)
2、 通过页面,创建project,然后upload。
3、 运行调度。
4、 查看运行结果。
5、 配置调度、运行参数、并行处理、失败处理等。
4. 任务创建及运行
4.1.创建Flow
使用Flow 2.0的模式,创建文件:flow20.project,xxxx.flow,xxxx.properties(可选)。
将文件打包成zip。注意:是打包所有文件,而不是打包整个文件夹。
4.2.创建Project
登录Azkaban,创建Project。
Name:后期不可修改。需统一命名规范。
Description: 可修改。
4.3.Project Permissions
项目授权
4.4.Project upload
将打包好的zip文件,通过web页面上传。
重复上传时,会覆盖原有的旧版本。
Azkaban目前只支持上传zip文件,上传后Azkaban将验证zip文件中的内容,检查Flow的依赖关系,遇到任何无效的Flow例如循环依赖的Flow,都会宣告上传失败。
新上传的Flow将会覆盖此项目中之前已经存在的Flow,成功上传之后,会在屏幕上列出所有的Flow。
4.5.查看Flow
● Graph:依赖关系图,右击可查看每个job的详细信息
● Executions:历史调度运行情况
● Flow Triggers:触发配置
● Summary:
4.6.运行Flow*
点击“Execute Flow”,打开工作流运行选项页面。
4.6.1.Flow View*
Right click on the jobs to disable and enable jobs in the flow.
可以禁用整个工作流中的某些job,禁用后该job将显示为透明的,并在整个flow执行过程中被忽略。
有以下几个选项:
● Parents:代表禁用job及其依赖的上一个job。
● Ancestors:代表禁用job及其依赖的全部job,即所有父job。
● Childern:代表禁用job及依赖其的下一个job。
● Descendents:代表禁用job及依赖其的全部job, 即所有子job。
● All:代表禁用与此job有依赖关系的全部job。
4.6.2.Notification*
Change the address where success and failure emails will be sent.
标签页中可以配置Flow运行成功或失败之后的通知行为。
Notify on Failure
● First Failure:只有Flow中有任一job运行失败就发送邮件通知。
● Flow Finished:即使某个job运行失败,也要运行完Flow中的全部job后再进行通知。
4.6.3.Failure Options*
Select flow behavior when a failure is detected.
当job运行失败后,你可以进行如下操作:
● Finish Current Running:尝试继续执行其他的正在运行的job,未运行的job将不会尝试开始运行,期间将Flow的状态设置为FAILED FINISHING,Flow运行失败后将状态改为FAILED。
● Cancel All:停止所有Flow中的其他job,宣告此Flow运行失败。
● Finish All Possible:尝试继续执行其他的job(包括已经在运行的和还未运行的),期间将Flow的状态设置为FAILED FINISHING,Flow运行失败后将状态改为FAILED。
4.6.4.Concurrent Options*
Change the behavior of the flow if it is already running.
如果希望运行多个相同的Flow,可以选择以下配置:
● Skip Execution:如果Flow已经在运行,不会再调用它。
● Run Concurrently:无论Flow是否在运行都会运行,但是选择不同的工作目录。
● Pipeline:并发运行多个Flow。
○ Level 1:前一个Flow的JobA结束后开始运行当前Flow的JobA
○ Level 2:前一个Flow的JobA及其下游的Job全部完成后,开始运行当前Flow
4.6.5.Flow Parameters*
Add temporary flow parameters that are used to override global settings for each job.
参见:UI页面工作流参数
4.7.任务调度
5.配置及参数处理
5.1.Azkaban参数类型
参数的作用范围分类:
● 局部参数:对当前job有效局部有效。
● 全局参数:对整个工作流全局有效。在整个工作流的作业文件配置中,都可以通过 ${参数名} 的方式引用使用。
5.2.job的common参数*
除了type,command,decpendencies三个参数外,还有如下一些保留参数可以为每个job配置。
我们可能会用到的参数有:retries, retry.backoff, notify.emails。
---
config:
retries: 3
retry.backoff: 2000
notify.emails: xxx@midea.com
nodes:
- name: jobB
type: command
config:
command: pwd
5.3.job参数的运行时(runtime)参数
这些属性在job运行期间自动被添加。
5.4.UI页面工作流参数*
可用于重跑历史任务。
5.4.1.UI页面输入参数定义及参数值
通过页面运行工作流时,输入运行参数,界面如下:
5.4.2.在job配置中使用
# test_properties.job
type=command
command=sh /data/install/st_sh/azkaban_test/test_properties.sh "${belong_date}" "${ent_id}" "${ent_name}"
5.4.3.shell脚本的调用
vi test_properties.sh
#!/bin/bash
belong_date=$1
ent_id=$2
ent_name=$3
echo "${belong_date}" "${ent_id}" "${ent_name}"
5.5.工作流ZIP压缩包中的属性文件*
可用于指定默认的运行参数。
默认参数值,可通过属性配置文件(.properties),通过里面的key=value进行配置。
后缀为.properties的文件将会作为参数文件加载,并且为flow中每个job所共享,属性文件通过目录分层结构继承。
需要一起打包到zip文件中。
5.5.1.代码
vi test_properties.properties
belong_date=2022-11-28
ent_id=10000
ent_name=kongtiaoent
vi test_properties.job
# test_properties.job
type=command
command=sh /data/install/st_sh/azkaban_test/test_properties.sh "${belong_date}" "${ent_id}" "${ent_name}"
注:参数没配置时的异常信息:Caused by: azkaban.utils.UndefinedPropertyException: Could not find variable substitution for variable(s) [command->belong_date]
5.5.2.作用域
比如,在zip包中有以下结构
system.properties
baz.job
myflow/myflow.properties
myflow/myflow2.properties
myflow/foo.job
myflow/bar.job
system.properties是全局的属性,将会被baz.job和myflow目录下的foo.job和bar.job使用,但是baz.job不会继承myflow.properties和myflow2.properties的属性,因为是它的下层.
5.6.job参数传递
JOB_OUTPUT_PROP_FILE和JOB_PROP_FILE都是一个环境变量,指向文件路径。上游节点把需要输出的值以json的格式写入JOB_OUTPUT_PROP_FILE文件,下游节点就可以在JOB_PROP_FILE中看到key-value形式的输出,用${key}的方式使用变量。
baseflow.flow
nodes:
- name: jobB
type: command
dependsOn:
- jobA
config:
command: sh commandB.sh "${firstName}"
- name: jobA
type: command
config:
command: sh commandA.sh
commandA.sh
#!/bin/bash
echo '{ "firstName":"John" , "lastName":"Doe" }' >> ${JOB_OUTPUT_PROP_FILE}
commandB.sh
#!/bin/bash
#cat ${JOB_PROP_FILE} >> /root/azkaban.txt
echo $1 >> /root/azkaban.txt
5.7.shell中使用参数的注意事项
在UI页面重新输入运行时参数时,可以覆盖系统默认生成的参数值。运行时参数,和UI输入的参数,都可以认为是全局参数,在整个工作流的作业配置中,都可以通过 ${参数名} 的方式引用使用。
● 在shell 中直接引用 公共参数,运行时系统参数,UI输入参数,是无效的。
● 在shell中只能直接使用环境变量;
● 公共参数,运行时系统参数,UI输入参数只能通过shell的脚本参数的方式传递进来。
● job文件中定义的环境变量参数,可以在shell脚本中直接引用,但只对当前job有效。
6.用户管理及权限授权
全局权限配置、具体project权限配置。
5.4.3.角色与用户组
Azkaban默认的角色类型:
● 为用户组授权具体的角色。
● 可以给用户授权具体的角色。也可以给用户授权具体的用户组。
● 用户可以同时拥有用户组和角色的权限组合。
5.4.4.用户权限配置
是全局配置,配置后对所有project都生效。
vi /data/install/azkaban/azkaban-web-server/conf/azkaban-users.xml
<azkaban-users>
<!-- 定义用户 -->
<user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
<user password="metrics" roles="metrics" username="metrics"/>
<user password="123456" roles="read" username="read"/>
<user password="123456" roles="write" username="write"/>
<user password="123456" roles="read,execute" username="execute"/>
<user password="123456" roles="schedule" username="schedule"/>
<user password="123456" roles="createprojects" username="createprojects"/>
<user groups="admin" password="123456" username="jianzr"/>
<user groups="develop" password="123456" username="develop"/>
<user groups="analyse" password="123456" username="analyse"/>
<!-- 定义组 -->
<!-- 管理员 -->
<group name="admin" roles="admin"/>
<!-- 数据开发 -->
<group name="develop" roles="read,execute"/>
<!-- 数据分析师 -->
<group name="analyse" roles="read,execute"/>
<!-- 定义角色 -->
<!-- 授权所有的在Azkaban中的访问权限 -->
<role name="admin" permissions="ADMIN"/>
<role name="metrics" permissions="METRICS"/>
<!-- 授权给用户所有项目和日志的读权限 -->
<role name="read" permissions="READ"/>
<!-- 用于用户上传文件,改变job属性配置 或 删除任何项目-->
<role name="write" permissions="WRITE"/>
<!-- 允许用户在任何工作流中触发execution -->
<role name="execute" permissions="EXECUTE"/>
<!-- 用户可以为所有的flow添加或删除任务 -->
<role name="schedule" permissions="SCHEDULE"/>
<!-- 如果一个项目锁住了,允许用户创建一个新项目 -->
<role name="createprojects" permissions="CREATEPROJECTS"/>
</azkaban-users>
5.4.5.页面权限配置
通过web页面,针对指定的project,配置对应的用户权限。
参考后面的章节。