亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Python 獲取 datax 執行結果保存到數據庫的方法

系統 2118 0

執行 datax 作業,創建執行文件,在 crontab 中每天1點(下面有關系)執行:

其中 job_start 及 job_finish 這兩行記錄是自己添加的,為了方便識別出哪張表。

            
#!/bin/bash
source /etc/profile
user1="root"
pass1="pwd"
user2="root"
pass2="pwd"
job_path="/opt/datax/job/"
 
jobfile=(
job_table_a.json
job_table_b.json
)
 
for filename in ${jobfile[@]}
do
	echo "job_start: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
	python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename}
	echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
done
 
# 0 1 * * * /opt/datax/job/dc_to_ods_incr.sh >> /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1
# egrep '任務|速度|總數|job_start|job_finish' /opt/datax/job/log/

          

datax 執行日志:

            
job_start: 2018-08-08 01:13:28 job_table_a.json
任務啟動時刻          : 2018-08-08 01:13:28
任務結束時刻          : 2018-08-08 01:14:49
任務總計耗時          :         81s
任務平均流量          :     192.82KB/s
記錄寫入速度          :      1998rec/s
讀出記錄總數          :       159916
讀寫失敗總數          :          0
job_finish: 2018-08-08 01:14:49 job_table_a.json
job_start: 2018-08-08 01:14:49 job_table_b.json
任務啟動時刻          : 2018-08-08 01:14:50
任務結束時刻          : 2018-08-08 01:15:01
任務總計耗時          :         11s
任務平均流量          :        0B/s
記錄寫入速度          :       0rec/s
讀出記錄總數          :          0
讀寫失敗總數          :          0
job_finish: 2018-08-08 01:15:01 job_table_b.json
          

接下來讀取這些信息保存到數據庫,在數據庫中創建表:

            
CREATE TABLE `datax_job_result` (
 `log_file` varchar(200) DEFAULT NULL,
 `job_file` varchar(200) DEFAULT NULL,
 `start_time` datetime DEFAULT NULL,
 `end_time` datetime DEFAULT NULL,
 `seconds` int(11) DEFAULT NULL,
 `traffic` varchar(50) DEFAULT NULL,
 `write_speed` varchar(50) DEFAULT NULL,
 `read_record` int(11) DEFAULT NULL,
 `failed_record` int(11) DEFAULT NULL,
 `job_start` varchar(200) DEFAULT NULL,
 `job_finish` varchar(200) DEFAULT NULL,
 `insert_time` datetime DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

          

定時執行以下文件,因為 datax 作業 1 點執行,為了獲取一天內最新生產的日志,腳本中取 82800內生產的日志文件,及23 小時內生產的那個最新日志。所以一天內任何時間執行都可以。此文件也是定時每天執行(判斷 datax 作業完成后執行)

            
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1
 
import re
import os
import sqlalchemy
import pandas as pd
import datetime as dt
 
def save_to_db(df):
	engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8") 
	df.to_sql("datax_job_result", engine, index=False, if_exists='append') 
 
def get_the_latest_file(path):
	t0 = dt.datetime.utcfromtimestamp(0)
	d2 = (dt.datetime.now() - t0).total_seconds()
	d1 = d2 - 82800
	for (dirpath, dirnames, filenames) in os.walk(path):
		for filename in sorted(filenames, reverse = True):
			if filename.endswith(".log"):
				f = os.path.join(dirpath,filename)
				ctime = os.stat(f)[-1]
				if ctime>=d1 and ctime <=d2:
					return f
			
def get_job_result_from_logfile(path):
	result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish'])
	log_file = get_the_latest_file(path)
	index = 0
	content = open(log_file, "r")
	for line in content:
		result.loc[index, 'log_file'] = log_file
		if re.compile(r'job_start').match(line):
			result.loc[index, 'job_file'] = line.split(' ')[4].strip()
			result.loc[index, 'job_start'] = line,
		elif re.compile(r'任務啟動時刻').match(line):
			result.loc[index, 'start_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()
		elif re.compile(r'任務結束時刻').match(line):
			result.loc[index, 'end_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()
		elif re.compile(r'任務總計耗時').match(line):
			result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','')
		elif re.compile(r'任務平均流量').match(line):
			result.loc[index, 'traffic'] = line.split(':')[1].strip()
		elif re.compile(r'記錄寫入速度').match(line):
			result.loc[index, 'write_speed'] = line.split(':')[1].strip()
		elif re.compile(r'讀出記錄總數').match(line):
			result.loc[index, 'read_record'] = line.split(':')[1].strip()
		elif re.compile(r'讀寫失敗總數').match(line):
			result.loc[index, 'failed_record'] = line.split(':')[1].strip()
		elif re.compile(r'job_finish').match(line):
			result.loc[index, 'job_finish'] = line,
			index = index + 1
		else:
			pass
	save_to_db(result)
 
get_job_result_from_logfile("/opt/datax/job/log")

          

以上這篇Python 獲取 datax 執行結果保存到數據庫的方法就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持腳本之家。


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦?。?!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 被公侵犯肉体中文字幕一区二区 | 特级黄色 | 日本强不卡在线观看 | 欧美亚洲天堂 | 99久久国产综合精品2020 | 9久热久re爱免费精品视频 | 国产aaaaaa | 国产一级久久久久久毛片 | 四虎影院在线观看免费 | 欧美国产日韩911在线观看 | 国产香蕉91tv永久在线 | 成人精品亚洲 | 99re这里只有精品99 | 99精品亚洲 | 亚洲人成一区二区不卡 | 亚洲图片综合区另类图片 | 国产成人91高清精品免费 | 91国语精品自产拍在线观看一 | 又粗又硬又大久久久 | 国产一区二区三区欧美精品 | 久久久久亚洲国产 | 日韩不卡免费视频 | 久久综合干 | 亚洲欧美日韩精品久久亚洲区 | 91九色精品国产免费 | 大学生一级毛片免费看真人 | 欧美激情社区 | 国产成人免费手机在线观看视频 | 久久国产精品高清一区二区三区 | 91香蕉福利一区二区三区 | 日本香蕉视频在线观看 | 欧美精品亚洲精品 | 国产成人精品高清不卡在线 | 狠狠干艹 | 天天综合天天 | 欧美日韩在线成人免费 | 国产精品亚洲一区二区在线观看 | 亚洲一区二区三区欧美 | 日本1区2区3区| 99re热久久资源最新获取 | 亚洲视频播放 |