0
点赞
收藏
分享

微信扫一扫

python、JAVA监控oozie任务执行,并报警到钉钉

westfallon 2022-04-30 阅读 54
python监控oozie组件的任务失败,任务未执行,并报警到钉钉
import pymysql
import sys
import os
import requests
import hmac
import hashlib
import base64
import urllib.parse
import time
import datetime
import logging

FORMAT = "%(message)s"
DATEFMT = "%Y-%m-%d %H:%M:%S"
FILE_FORMAT = "%Y_%m_%d_%H"
oozie_log = os.path.join(f'{time.strftime(FILE_FORMAT)}')
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt=DATEFMT, filename=oozie_log)
logger = logging.getLogger(__name__)


class OozieJob(object):

    @staticmethod
    def conn_oozie_mysql():
        try:
            MYSQL_CONFIG = {'host': '127.0.0.1',
                            'user': 'oozie',
                            'password': 'oozie',
                            'database': 'oozie',
                            'port': 3306
                            }
            db = pymysql.connect(**MYSQL_CONFIG)
            return db
        except Exception as err:
            logger.info(f'db connect error ====> {err}')
            # 数据库报障
            db_msg = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "Oozie 数据库连接失败",
                    "text": f'title: Oozie 数据库连接失败 \n\n job_name: conn_oozie_mysql \n\n err_time: {time.time()} \n\n  err_msg: {err}'
                }
            }
            send_ding_msg(db_msg)

    @staticmethod
    def fail_job(db):

        cur = db.cursor()
        # 查询3分钟内失败的任务
        sql = "select id,app_name,user_name,end_time,start_time,status from WF_JOBS where status='KILLED'  and end_time between date_add(now(),interval-3 minute) and now()"
        cur.execute(sql)
        datalist = cur.fetchall()

        for (id, app_name, user_name, end_time, start_time, status) in datalist:
            fail_msg = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "Oozie任务失败",
                    "text": f"title: Oozie任务失败 \n\n job_name: {app_name},\n\n job_id: {id},\n\n start_time: {start_time},\n\n  end_time: {end_time},\n\n  job_stauts: {status}\n\n"
                }
            }
            if check_send_log(id, oozie_log) is False:
                send_ding_msg(fail_msg)
                logger.info(id)

    @staticmethod
    def not_exec_job(db):

        cur = db.cursor()
        # 查询昨天执行成功的任务与今天执行的任务
        sql_ys_job = "SELECT DISTINCT(app_name) as app_name FROM WF_JOBS WHERE LEFT(start_time, 10) = LEFT(DATE_ADD(NOW(), INTERVAL -1 DAY), 10) AND STATUS = 'SUCCEEDED'"
        sql_td_job = "SELECT DISTINCT(app_name) as app_name FROM WF_JOBS WHERE LEFT(start_time, 10) = LEFT(DATE_ADD(NOW(), INTERVAL 0 DAY), 10) "
        cur.execute(sql_ys_job)
        ys_job = cur.fetchall()
        cur.execute(sql_td_job)
        td_job = cur.fetchall()
        # 今天执行任务数量小于昨天,或者当天的执行任务数量为0时报警
        if len(td_job) < len(ys_job) or len(td_job) == 0:
            not_msg = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "Oozie任务执行异常",
                    "text": f"title: Oozie任务执行异常 \n\n yesterday_job_counts: {len(ys_job)},\n\n today_job_counts: {len(td_job)}, \n\n  job_diff: {len(td_job) - len(ys_job)},\n\n   msg_time: {time.strftime(DATEFMT)},\n\n  "
                }
            }
            send_ding_msg(not_msg)
            logger.info('oozie job exec error')

    @staticmethod
    def delay_job(db):

        cur = db.cursor()
        # 执行时间大于60分钟的任务
        sql = "select id,app_name,user_name,start_time,TIMESTAMPDIFF(MINUTE, start_time, now()) as diff from WF_JOBS where status='RUNNING' and TIMESTAMPDIFF(MINUTE, start_time, now()) >60 and app_name <> 'error_log_track_workflow'"
        cur.execute(sql)
        datalist = cur.fetchall()
        for (id, app_name, user_name, start_time, diff) in datalist:
            delay_msg = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "Oozie任务延迟告警",
                    "text": f"title: Oozie任务延迟告警 \n\n job_name: {app_name},\n\n job_id: {id},\n\n  start_time: {start_time},\n\n  running_time: {diff},\n\n  job_stauts: RUNNING\n\n"
                }
            }
            if check_send_log(id, oozie_log) is False:
                send_ding_msg(delay_msg)
                logger.info(id)


def check_log(job_id, send_log):
    with open(send_log, 'r', encoding='utf-8') as s:
        for line in s:
            line = line.replace('\n', '')
            if line == job_id:
                return True
    return False


# 钉钉消息

def get_ding_sign(timestamp, secret):
    """
    钉钉机器人API验签
    """
    secret_encode = secret.encode('utf-8')
    str_to_sign = f'{timestamp}\n{secret}'
    str_to_sign_enc = str_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_encode, str_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return sign


def send_ding_msg(msg):
    url = 'https://oapi.dingtalk.com/robot/send?access_token=token(自己的哦)'
    secret = 'secret(自己的哦)'
    timestamp = str(round(time.time() * 1000))
    sign = get_ding_sign(timestamp=timestamp, secret=secret)
    url = url + '&timestamp=' + timestamp + '&sign=' + sign
    try:
        req = requests.post(url, json=msg)
        print(msg)
        result = req.json()
        print(req.status_code)
        if result['errcode'] != 0:
            print(f'notify dingtalk error: {result["errcode"]}')
    except Exception as err:
        print(err)


if __name__ == '__main__':

    start_time = time.strftime(DATEFMT)
    logger.info(f'==== oozie monitor start ====> {start_time}')
    oz_cli = OozieJob()
    db = oz_cli.conn_oozie_mysql()
    oz_cli.fail_job(db)
    time.sleep(0.1)
    oz_cli.delay_job(db)
    time.sleep(0.2)
    if datetime.datetime.now().hour == 7:
        oz_cli.not_exec_job(db)
    end_time = time.strftime(DATEFMT)
    logger.info(f'==== oozie monitor end ====> {start_time}')
JAVA报警到钉钉
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.squareup.okhttp.*;
import org.apache.commons.net.util.Base64;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;


public class Demo {


    public static void main(String[] args) {
        post2DingDing("测试2");
    }

    public static String post2DingDing(String message){
        String secret = "secret自己的哦";
        String url = "https://oapi.dingtalk.com/robot/send?access_token=token自己的哦";
        long timestamp = System.currentTimeMillis();
        String str="{ \"at\": { \"isAtAll\": true }, \"text\": { \"content\": \" %s \"}, \"msgtype\":\"text\" }";
        String format = String.format(str, message);
        JSONObject json = JSON.parseObject(format);
        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json.toJSONString());
        Request request = new Request.Builder().url(getSign(timestamp, secret, url)).post(requestBody).build();
        try {
            Response response = new OkHttpClient().newCall(request).execute();
            if (response.body() != null){
                return response.body().string();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static String getSign(long timestamp,String secret,String url){
        String string_to_sign = timestamp + "\n" +secret;
        byte[] sign_data = null;
        String sign = "";
        try {
            Mac mac = Mac.getInstance("HmacSHA256");
            mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8),"HmacSHA256"));
            sign_data = mac.doFinal(string_to_sign.getBytes(StandardCharsets.UTF_8));
            sign =  url + "&timestamp=" + timestamp + "&sign=" + URLEncoder.encode(new String(Base64.encodeBase64(sign_data),"UTF-8"));
        } catch (NoSuchAlgorithmException | InvalidKeyException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
            return sign;
    }
}
举报

相关推荐

0 条评论