1 概述
1.1 版本
服务器版本 |
---|
v7.2.0及之后版本 |
1.2 应用场景
在数据开发过程中,对于一些复杂的数据处理,【SparkSQL】节点无法处理的,则需要使用【Python】节点进行处理。
通过Python脚本可实现对各类数据源的抓取、加工处理(包括:数据清洗、增加计算列、数据过滤、左右关联、上下关联、分组汇总、排序等)、转存等。
1.3 功能简介
【Python】节点主要使用Python脚本对于一些复杂的数据进行处理,与 SparkSQL 的区别,就类似数据库中 SQL 和存储过程的关系,【Python】节点主要处理比如面向过程和逻辑性的业务。
2 功能说明
【Pyhon】节点支持的功能包括:SQL语句和数据预览。与【SparkSQL】类似,这里不再重复介绍,不同的是:代码中需要编写Pyhon脚本。
注:
1)【Pyhon】节点前面只能连接一个节点;
2)【Pyhon】节点默认使用“Python”类型的命令,代码要使用对应的脚本格式;
3)【Pyhon】节点中若需要引用参数,参数格式为:${参数名}。
3 示例
业务场景:模板执行日志 dn_m_templete_l 表中的详细信息 DETAIL_ (内容为JSON字符串)拆分成对应的列进行显示,并将详细信息列删除。
具体实现步骤如下:
1、获取 dn_m_templete_l 表数据
1)新建任务,添加【数据转换】节点,并双击进去该节点的设计界面;
2)添加【DB输入】节点,获取 dn_m_templete_l 表数据,配置如下图:
3)点击“数据预览”查看数据,可以看到 DETAIL_ 中的数据为JSON字符串,如下图:
2、使用 Python 进行数据处理
添加【Python】节点,编写 Python 脚本,将 DETAIL_ 中的内容作为对应的列显示,并将详细信息列删除,如下图:
代码支持最大化编辑,完整语句如下:
#import pandas as pd
# 必须使用pandas库
# 数据处理
df = #{dbInput_p2xj,模板执行日志}
name_cols = df['DETAIL_']
name_size = len(name_cols)
fileAlias_ls = []
fileName_ls = []
userName_ls = []
userAlias_ls=[]
logStatus_ls=[]
remark_ls=[]
reportType_ls=[]
source_ls=[]
volumeName_ls=[]
volumeAlias_ls=[]
for i in range(0,name_size):
r = name_cols[i]
json_ = json.loads(r)
fileAlias_ls.append(json_.get('fileAlias',''))
fileName_ls.append(json_.get('fileName',''))
userName_ls.append(json_.get('userName',''))
userAlias_ls.append(json_.get('userAlias',''))
logStatus_ls.append(json_.get('logStatus',''))
remark_ls.append(json_.get('remark',''))
reportType_ls.append(json_.get('reportType',''))
source_ls.append(json_.get('source',''))
volumeName_ls.append(json_.get('volumeName',''))
volumeAlias_ls.append(json_.get('volumeAlias',''))
#移除DETAIL_
df = df.drop('DETAIL_', axis=1)
#添加转换后的列
df['FILE_ALIAS']= fileAlias_ls
df['FILE_ALIAS'] = df['FILE_ALIAS'].astype(str)
df['FILE_NAME']= fileName_ls
df['FILE_NAME'] = df['FILE_NAME'].astype(str)
df['USER_NAME']= userName_ls
df['USER_NAME'] = df['USER_NAME'].astype(str)
df['USER_ALIAS']= userAlias_ls
df['USER_ALIAS'] = df['USER_ALIAS'].astype(str)
df['LOG_STATUS']= logStatus_ls
df['LOG_STATUS'] = df['LOG_STATUS'].astype(str)
df['REMARK']= remark_ls
df['REMARK'] = df['REMARK'].astype(str)
df['REPORT_TYPE']= reportType_ls
df['REPORT_TYPE'] = df['REPORT_TYPE'].astype(str)
df['SOURCE']= source_ls
df['SOURCE'] = df['SOURCE'].astype(str)
df['VOLUME_NAME']= volumeName_ls
df['VOLUME_NAME'] = df['VOLUME_NAME'].astype(str)
df['VOLUME_ALIAS']= volumeAlias_ls
df['VOLUME_ALIAS'] = df['VOLUME_ALIAS'].astype(str)
data= df
点击“数据预览”可查看最终处理的结果,如下图,“SERVER_TAG_”后的列均是增加的列。
3、输出数据
添加【DB输出】节点与【Python】节点连接,配置“数据去向”将处理后的数据输出。如下图所示:
最后编辑:柳杨 更新时间:2025-03-27 15:47
