1 概述
1.1 版本
服务器版本 |
---|
v7.2.0及之后版本 |
1.2 应用场景
【Python 脚本】主要用于数据处理是通过 Python 脚本实现的场景(比如:文件类型、爬虫获取的数据处理后存到数据库中,供下游节点使用),支持直接调用已有的 Python 脚本。
1.3 功能简介
通过编写Python代码进行复杂数据处理,解决 SQL 脚本较难实现的业务。
2 功能说明
Python 脚本配置,包括: Python 命令、Python 代码、脚本参数 。
1、Python 命令
支持“Python、Python2、Python3”三种类型,默认为“Python”。
2、Python 代码
可直接在编辑器中编写对应的Python代码。
如果本地有编辑好的 Python 文件,也可点击“导入”按钮选择对应的文件进行导入,导入后的代码允许进行再次编辑。
注:Python 代码需要使用所选 Python 命令对应的格式。
3、脚本参数
支持为脚本中的参数设置参数值,支持常量和表达式。
脚本参数值传入脚本的顺序为自上而下。
注:执行Python脚本的前提是要在对应环境下安装Pyhon安装包,具体说明参考:Python环境配置
3 示例
业务场景:拆分请假明细数据,使用【Python脚本】主要为了解决请假流程明细数据中的请假天数跨天的现象,需要把跨天的一条数据拆分成多条。
添加【Python脚本】节点,与转换节点“请假流程明细”连接,配置如下:
完整Python脚本如下:
from datetime import datetime, timedelta
import pymysql
from pymysql.cursors import DictCursor
conn = pymysql.connect(host="144.20.80.97", port=3306, user='root', password='123456', db='test',
charset='utf8mb4')
vstart_time = '2024-06-20'
end_time = '2024-06-25'
time_format = '%Y-%m-%d %H:%M'
date_format = '%Y-%m-%d'
def time_compare(datestr, hour, min):
parse_time = datetime.strptime(datestr, time_format)
year = parse_time.year
month = parse_time.month
day = parse_time.day
start_time = datetime(year, month, day, hour, min)
if parse_time < start_time:
return -1
elif parse_time > start_time:
return 1
else:
return 0
def day_add(date_str, day):
date = datetime.strptime(date_str, "%Y-%m-%d")
new_date = date + timedelta(days=day) # 加一天
new_date_str = new_date.strftime("%Y-%m-%d") # 将日期对象转换为字符串
return new_date_str
def execute_many(sql: str, data: [()]):
try:
cursor = conn.cursor()
cursor.executemany(sql, data)
cursor.close()
conn.commit()
except (pymysql.Error, Exception) as e:
conn.rollback()
# 异常数据单条写入
for dt in data:
try:
cursor = conn.cursor()
cursor.execute(sql, dt)
cursor.close()
conn.commit()
except (pymysql.Error, Exception) as e2:
conn.rollback()
def sel_all(sel_sql, is_dict):
if is_dict:
cursor = conn.cursor(DictCursor)
else:
cursor = conn.cursor()
cursor.execute(sel_sql)
result = cursor.fetchall()
return result
cur_gzlsj_sql = f"""
select WORKCODE sgh,SQRQ srq,ksrq skssj,jsrq sjzsj,DATE_FORMAT(ksrq,'%Y-%m-%d') as sksrq,DATE_FORMAT(jsrq,'%Y-%m-%d') as sjzrq,
case QJLX when 2 then '年假' when 6 then '事假' when 7 then '病假' when 8 then '产假' when 9 then '陪产假'
when 10 then '婚假' when 11 then '丧假' end as ilx
from dw_qjlc a where KSRQ>='{vstart_time}' and KSRQ<='{end_time}' order by SQRQ;
"""
delete_dw_qjlc_jckq_sql = "delete from dw_qjlc_jckq where rq=%s and workcode=%s;"
delete_dw_qjlc_jg_sql = "delete from dw_qjlc_jg where rq>=%s and rq<=%s and workcode=%s"
insert_dw_qjlc_jckq_sql = "insert into dw_qjlc_jckq(workcode,rq,lx,dk) values(%s,%s,%s,%s)"
insert_dw_qjlc_jg_sql = "insert into dw_qjlc_jg(workcode,rq,sw,xw) values(%s,%s,%s,%s)"
select_dw_qjlc_jg_res = """
select workcode,DATE_FORMAT(rq,'%Y-%m-%d') rq,(case sw when '' then null else sw end) as sw,(case xw when '' then null else xw end) as xw from (
select workcode,rq,max(sw) as sw,max(xw) as xw from (
select workcode,rq,CONCAT(dk,lx) as sw,'' as xw from dw_qjlc_jckq where dk='上午' and rq>='{}' and rq<='{}' and workcode='{}'
union ALL
select workcode,rq,'' as sw,CONCAT(dk,lx) as xw from dw_qjlc_jckq where dk='下午' and rq>='{}' and rq<='{}' and workcode='{}'
) a group by rq
) b;
"""
delete_dw_qjlc_jckq_ls = []
delete_dw_qjlc_jg_ls = []
insert_dw_qjlc_jckq_ls = []
insert_dw_qjlc_jg_ls = []
sel_dw_qjlc_jg_sqls = []
result = sel_all(cur_gzlsj_sql, True)
if result:
for r in result:
sksrq = r.get('sksrq')
sjzrq = r.get('sjzrq')
skssj = r.get('skssj')
sgh = r.get('sgh')
ilx = r.get('ilx')
sjzsj = r.get('sjzsj')
if sksrq != sjzrq:
dl = (skssj, sgh)
delete_dw_qjlc_jckq_ls.append(dl)
if time_compare(skssj, 8, 30) < 0:
il = (sgh, skssj, ilx, '上午')
insert_dw_qjlc_jckq_ls.append(il)
il = (sgh, skssj, ilx, '下午')
insert_dw_qjlc_jckq_ls.append(il)
elif time_compare(skssj, 17, 30) < 0:
il = (sgh, skssj, ilx, '下午')
insert_dw_qjlc_jckq_ls.append(il)
dl = (sjzsj, sgh)
delete_dw_qjlc_jckq_ls.append(dl)
if time_compare(sjzsj, 17, 30) > 0:
il = (sgh, sjzsj, ilx, '上午')
insert_dw_qjlc_jckq_ls.append(il)
il = (sgh, sjzsj, ilx, '下午')
insert_dw_qjlc_jckq_ls.append(il)
elif time_compare(sjzsj, 8, 30) > 0:
il = (sgh, sjzsj, ilx, '上午')
insert_dw_qjlc_jckq_ls.append(il)
temprq = day_add(sksrq, 1)
while temprq < sjzrq:
dl = (temprq, sgh)
delete_dw_qjlc_jckq_ls.append(dl)
il = (sgh, temprq, ilx, '上午')
insert_dw_qjlc_jckq_ls.append(il)
il = (sgh, temprq, ilx, '下午')
insert_dw_qjlc_jckq_ls.append(il)
temprq = day_add(temprq, 1)
else:
dl = (skssj, sgh)
delete_dw_qjlc_jckq_ls.append(dl)
if time_compare(skssj, 8, 30) <= 0:
il = (sgh, skssj, ilx, '上午')
insert_dw_qjlc_jckq_ls.append(il)
if time_compare(sjzsj, 17, 30) >= 0:
il = (sgh, sjzsj, ilx, '下午')
insert_dw_qjlc_jckq_ls.append(il)
dl = (sksrq, sjzrq, sgh)
delete_dw_qjlc_jg_ls.append(dl)
sel_dw_qjlc_jg_sqls.append(select_dw_qjlc_jg_res.format(sksrq, sjzrq, sgh, sksrq, sjzrq, sgh))
execute_many(delete_dw_qjlc_jckq_sql, delete_dw_qjlc_jckq_ls)
execute_many(insert_dw_qjlc_jckq_sql, insert_dw_qjlc_jckq_ls)
execute_many(delete_dw_qjlc_jg_sql, delete_dw_qjlc_jg_ls)
if sel_dw_qjlc_jg_sqls:
for s in sel_dw_qjlc_jg_sqls:
rs = sel_all(s, False)
if rs:
for r in rs:
insert_dw_qjlc_jg_ls.append(r)
execute_many(insert_dw_qjlc_jg_sql, insert_dw_qjlc_jg_ls)
作者:fancy 创建时间:2024-06-13 10:40
最后编辑:fancy 更新时间:2025-03-27 15:47
最后编辑:fancy 更新时间:2025-03-27 15:47
