1 概述

1.1 版本

服务器版本
v7.2.0及之后版本

1.2 应用场景

在数据开发过程中,对于一些复杂的数据处理,【SparkSQL】节点无法处理的,则需要使用【Python】节点进行处理。
通过Python脚本可实现对各类数据源的抓取、加工处理(包括:数据清洗、增加计算列、数据过滤、左右关联、上下关联、分组汇总、排序等)、转存等。

1.3 功能简介

【Python】节点主要使用Python脚本对于一些复杂的数据进行处理,与 SparkSQL 的区别,就类似数据库中 SQL 和存储过程的关系,【Python】节点主要处理比如面向过程和逻辑性的业务。

2 功能说明

【Pyhon】节点支持的功能包括:SQL语句和数据预览。与【SparkSQL】类似,这里不再重复介绍,不同的是:代码中需要编写Pyhon脚本。

注:
1)【Pyhon】节点前面只能连接一个节点;
2)【Pyhon】节点默认使用“Python”类型的命令,代码要使用对应的脚本格式;
3)【Pyhon】节点中若需要引用参数,参数格式为:${参数名}。

3 示例

业务场景:模板执行日志 dn_m_templete_l 表中的详细信息 DETAIL_ (内容为JSON字符串)拆分成对应的列进行显示,并将详细信息列删除。
具体实现步骤如下:
1、获取 dn_m_templete_l 表数据
1)新建任务,添加【数据转换】节点,并双击进去该节点的设计界面;
2)添加【DB输入】节点,获取 dn_m_templete_l 表数据,配置如下图:

3)点击“数据预览”查看数据,可以看到 DETAIL_ 中的数据为JSON字符串,如下图:

2、使用 Python 进行数据处理
添加【Python】节点,编写 Python 脚本,将 DETAIL_ 中的内容作为对应的列显示,并将详细信息列删除,如下图:

代码支持最大化编辑,完整语句如下:

#import pandas as pd
# 必须使用pandas库

# 数据处理
df = #{dbInput_p2xj,模板执行日志}

name_cols = df['DETAIL_']
name_size = len(name_cols)
fileAlias_ls = []
fileName_ls = []
userName_ls = []
userAlias_ls=[]
logStatus_ls=[]
remark_ls=[]
reportType_ls=[]
source_ls=[]
volumeName_ls=[]
volumeAlias_ls=[]
for i in range(0,name_size):
    r = name_cols[i]
    json_ = json.loads(r)
    fileAlias_ls.append(json_.get('fileAlias',''))
    fileName_ls.append(json_.get('fileName',''))
    userName_ls.append(json_.get('userName',''))
    userAlias_ls.append(json_.get('userAlias',''))
    logStatus_ls.append(json_.get('logStatus',''))
    remark_ls.append(json_.get('remark',''))
    reportType_ls.append(json_.get('reportType',''))
    source_ls.append(json_.get('source',''))
    volumeName_ls.append(json_.get('volumeName',''))
    volumeAlias_ls.append(json_.get('volumeAlias',''))

#移除DETAIL_
df = df.drop('DETAIL_', axis=1)

#添加转换后的列
df['FILE_ALIAS']= fileAlias_ls
df['FILE_ALIAS'] = df['FILE_ALIAS'].astype(str)

df['FILE_NAME']= fileName_ls
df['FILE_NAME'] = df['FILE_NAME'].astype(str)

df['USER_NAME']= userName_ls
df['USER_NAME'] = df['USER_NAME'].astype(str)

df['USER_ALIAS']= userAlias_ls
df['USER_ALIAS'] = df['USER_ALIAS'].astype(str)

df['LOG_STATUS']= logStatus_ls
df['LOG_STATUS'] = df['LOG_STATUS'].astype(str)

df['REMARK']= remark_ls
df['REMARK'] = df['REMARK'].astype(str)

df['REPORT_TYPE']= reportType_ls
df['REPORT_TYPE'] = df['REPORT_TYPE'].astype(str)

df['SOURCE']= source_ls
df['SOURCE'] = df['SOURCE'].astype(str)

df['VOLUME_NAME']= volumeName_ls
df['VOLUME_NAME'] = df['VOLUME_NAME'].astype(str)

df['VOLUME_ALIAS']= volumeAlias_ls
df['VOLUME_ALIAS'] = df['VOLUME_ALIAS'].astype(str)

data= df

点击“数据预览”可查看最终处理的结果,如下图,“SERVER_TAG_”后的列均是增加的列。

3、输出数据
添加【DB输出】节点与【Python】节点连接,配置“数据去向”将处理后的数据输出。如下图所示:

作者:fancy  创建时间:2024-06-13 10:55
最后编辑:柳杨  更新时间:2025-03-27 15:47