1 概述

1.1 版本

服务器版本
v7.2.0及之后版本

1.2 应用场景

【Python 脚本】主要用于数据处理是通过 Python 脚本实现的场景(比如:文件类型、爬虫获取的数据处理后存到数据库中,供下游节点使用),支持直接调用已有的 Python 脚本。

1.3 功能简介

通过编写Python代码进行复杂数据处理,解决 SQL 脚本较难实现的业务。

2 功能说明

Python 脚本配置,包括: Python 命令、Python 代码、脚本参数 。

1、Python 命令
支持“Python、Python2、Python3”三种类型,默认为“Python”。
2、Python 代码
可直接在编辑器中编写对应的Python代码。
如果本地有编辑好的 Python 文件,也可点击“导入”按钮选择对应的文件进行导入,导入后的代码允许进行再次编辑。

注:Python 代码需要使用所选 Python 命令对应的格式。

3、脚本参数

支持为脚本中的参数设置参数值,支持常量和表达式。
脚本参数值传入脚本的顺序为自上而下。

注:执行Python脚本的前提是要在对应环境下安装Pyhon安装包,具体说明参考:Python环境配置

3 示例

业务场景:拆分请假明细数据,使用【Python脚本】主要为了解决请假流程明细数据中的请假天数跨天的现象,需要把跨天的一条数据拆分成多条。
添加【Python脚本】节点,与转换节点“请假流程明细”连接,配置如下:

完整Python脚本如下:

from datetime import datetime, timedelta

import pymysql
from pymysql.cursors import DictCursor

conn = pymysql.connect(host="144.20.80.97", port=3306, user='root', password='123456', db='test',
                       charset='utf8mb4')

vstart_time = '2024-06-20'
end_time = '2024-06-25'
time_format = '%Y-%m-%d %H:%M'
date_format = '%Y-%m-%d'


def time_compare(datestr, hour, min):
    parse_time = datetime.strptime(datestr, time_format)
    year = parse_time.year
    month = parse_time.month
    day = parse_time.day
    start_time = datetime(year, month, day, hour, min)
    if parse_time < start_time:
        return -1
    elif parse_time > start_time:
        return 1
    else:
        return 0


def day_add(date_str, day):
    date = datetime.strptime(date_str, "%Y-%m-%d")
    new_date = date + timedelta(days=day)  # 加一天
    new_date_str = new_date.strftime("%Y-%m-%d")  # 将日期对象转换为字符串
    return new_date_str


def execute_many(sql: str, data: [()]):
    try:
        cursor = conn.cursor()
        cursor.executemany(sql, data)
        cursor.close()
        conn.commit()
    except (pymysql.Error, Exception) as e:
        conn.rollback()
        # 异常数据单条写入
        for dt in data:
            try:
                cursor = conn.cursor()
                cursor.execute(sql, dt)
                cursor.close()
                conn.commit()
            except (pymysql.Error, Exception) as e2:
                conn.rollback()


def sel_all(sel_sql, is_dict):
    if is_dict:
        cursor = conn.cursor(DictCursor)
    else:
        cursor = conn.cursor()
    cursor.execute(sel_sql)
    result = cursor.fetchall()
    return result


cur_gzlsj_sql = f"""
select WORKCODE sgh,SQRQ srq,ksrq skssj,jsrq sjzsj,DATE_FORMAT(ksrq,'%Y-%m-%d') as sksrq,DATE_FORMAT(jsrq,'%Y-%m-%d') as sjzrq,
case QJLX when 2 then '年假'  when 6 then '事假' when 7 then '病假' when 8 then '产假' when 9 then '陪产假'
when 10 then '婚假' when 11 then '丧假'  end as ilx
from dw_qjlc a where KSRQ>='{vstart_time}' and KSRQ<='{end_time}'  order by SQRQ;
"""
delete_dw_qjlc_jckq_sql = "delete from dw_qjlc_jckq where rq=%s and workcode=%s;"
delete_dw_qjlc_jg_sql = "delete from dw_qjlc_jg where rq>=%s and rq<=%s and workcode=%s"
insert_dw_qjlc_jckq_sql = "insert into dw_qjlc_jckq(workcode,rq,lx,dk) values(%s,%s,%s,%s)"
insert_dw_qjlc_jg_sql = "insert into dw_qjlc_jg(workcode,rq,sw,xw) values(%s,%s,%s,%s)"
select_dw_qjlc_jg_res = """
    select workcode,DATE_FORMAT(rq,'%Y-%m-%d') rq,(case sw when '' then null else sw end) as sw,(case xw when '' then null else xw end) as xw from (
         select workcode,rq,max(sw) as sw,max(xw) as xw from (
         select workcode,rq,CONCAT(dk,lx) as sw,'' as xw from dw_qjlc_jckq where dk='上午' and rq>='{}' and rq<='{}' and  workcode='{}'
         union ALL
         select workcode,rq,'' as sw,CONCAT(dk,lx) as xw from dw_qjlc_jckq where dk='下午' and rq>='{}' and rq<='{}' and  workcode='{}'
        ) a group by rq
        ) b;
"""
delete_dw_qjlc_jckq_ls = []
delete_dw_qjlc_jg_ls = []
insert_dw_qjlc_jckq_ls = []
insert_dw_qjlc_jg_ls = []
sel_dw_qjlc_jg_sqls = []
result = sel_all(cur_gzlsj_sql, True)
if result:
    for r in result:
        sksrq = r.get('sksrq')
        sjzrq = r.get('sjzrq')
        skssj = r.get('skssj')
        sgh = r.get('sgh')
        ilx = r.get('ilx')
        sjzsj = r.get('sjzsj')
        if sksrq != sjzrq:
            dl = (skssj, sgh)
            delete_dw_qjlc_jckq_ls.append(dl)
            if time_compare(skssj, 8, 30) < 0:
                il = (sgh, skssj, ilx, '上午')
                insert_dw_qjlc_jckq_ls.append(il)
                il = (sgh, skssj, ilx, '下午')
                insert_dw_qjlc_jckq_ls.append(il)
            elif time_compare(skssj, 17, 30) < 0:
                il = (sgh, skssj, ilx, '下午')
                insert_dw_qjlc_jckq_ls.append(il)

            dl = (sjzsj, sgh)
            delete_dw_qjlc_jckq_ls.append(dl)
            if time_compare(sjzsj, 17, 30) > 0:
                il = (sgh, sjzsj, ilx, '上午')
                insert_dw_qjlc_jckq_ls.append(il)
                il = (sgh, sjzsj, ilx, '下午')
                insert_dw_qjlc_jckq_ls.append(il)
            elif time_compare(sjzsj, 8, 30) > 0:
                il = (sgh, sjzsj, ilx, '上午')
                insert_dw_qjlc_jckq_ls.append(il)

            temprq = day_add(sksrq, 1)
            while temprq < sjzrq:
                dl = (temprq, sgh)
                delete_dw_qjlc_jckq_ls.append(dl)
                il = (sgh, temprq, ilx, '上午')
                insert_dw_qjlc_jckq_ls.append(il)
                il = (sgh, temprq, ilx, '下午')
                insert_dw_qjlc_jckq_ls.append(il)
                temprq = day_add(temprq, 1)
        else:
            dl = (skssj, sgh)
            delete_dw_qjlc_jckq_ls.append(dl)
            if time_compare(skssj, 8, 30) <= 0:
                il = (sgh, skssj, ilx, '上午')
                insert_dw_qjlc_jckq_ls.append(il)
            if time_compare(sjzsj, 17, 30) >= 0:
                il = (sgh, sjzsj, ilx, '下午')
                insert_dw_qjlc_jckq_ls.append(il)

        dl = (sksrq, sjzrq, sgh)
        delete_dw_qjlc_jg_ls.append(dl)
        sel_dw_qjlc_jg_sqls.append(select_dw_qjlc_jg_res.format(sksrq, sjzrq, sgh, sksrq, sjzrq, sgh))

execute_many(delete_dw_qjlc_jckq_sql, delete_dw_qjlc_jckq_ls)
execute_many(insert_dw_qjlc_jckq_sql, insert_dw_qjlc_jckq_ls)
execute_many(delete_dw_qjlc_jg_sql, delete_dw_qjlc_jg_ls)
if sel_dw_qjlc_jg_sqls:
    for s in sel_dw_qjlc_jg_sqls:
        rs = sel_all(s, False)
        if rs:
            for r in rs:
                insert_dw_qjlc_jg_ls.append(r)
execute_many(insert_dw_qjlc_jg_sql, insert_dw_qjlc_jg_ls)
作者:fancy  创建时间:2024-06-13 10:40
最后编辑:fancy  更新时间:2025-03-27 15:47