1 概述
1.1 版本
| 服务器版本 |
|---|
| v8.0.0及之后版本 |
1.2 功能简介
Kafka实时任务功能支持将从指定Kafka服务实时获取的数据,自动同步到目标数据库中,并可配置成功处理后的二次数据处理流程。
2 操作步骤
2.1 前置配置:Kafka消息
在创建Kafka实时任务前,您需要先在平台门户配置Kafka服务信息,供任务创建时引用。
2.1.1 入口
登录系统管理员账号,进入【系统维护》Kafka消息】菜单。
2.2.2 Kafka消息配置
点击“新建”按钮将侧推弹窗新建界面,如下图:
填写以下必要信息:
- 显示名称:为这个Kafka服务起一个便于识别的名称,如“生产环境Kafka集群”。(必填,当前列表内唯一)
- 服务地址:填入Kafka服务的连接地址,目前仅支持单服务,格式为 IP:PORT。(必填)
- 认证方式:根据您的Kafka服务安全设置进行选择:
1)无认证:无需填写额外信息。
2)用户名密码:需填写对应的用户名和密码。 - 字符集:指定数据编码,默认值为 UTF-8,通常无需修改。
配置完成后,此Kafka消息即可在数据开发平台中供所有用户使用。
2.2 创建Kafka实时任务
完成前置配置后,即可在数据开发平台创建实时同步任务。
2.2.1 新建任务
进入【数据开发平台》实时开发】,点击+“实时同步任务”。
2.2.2 配置数据来源
在“任务配置”的第一步,将数据来源类型由默认的“Mysql”切换为 “Kafka”。
选择“Kafka”类型后,数据来源配置区会变化,需填写:
- Kafka消息:从下拉列表中选择您在门户预先配置好的的Kafka服务。
- 消息主题:输入需要订阅的一个或多个Kafka主题(Topic)。多个Topic请用英文逗号分隔。
注:输入的Topic名称一定要在对应Kafka服务中是存在的,若不存在,则实时任务执行后将会被异常中止。
系统行为说明:
任务同步类型固定为“仅增量同步”,即持续监听新消息。
任务启动时会从每个分区最早的偏移量(offset)开始消费。
2.2.3 配置数据去向
此步骤用于定义Kafka消息同步到哪个数据库的哪张表。
选择目标:与常规任务类似,选择目标数据源和目标表。
字段映射:这是Kafka任务的核心配置。系统会将Kafka消息解析为固定的几个字段,您需要将这些字段映射到目标表的列上,目标表建议选择“自动建表”,输入表名后,点击“手动建表”点击确定即可。
注:目标表物理主键不能为空,以此保证写入数据的唯一性。
左侧(源):显示您在上移步骤【数据来源】的“消息主题”中填写的Topic列表。您可以选择特定的Topic进行映射配置。
右侧(映射):Kafka消息包含以下固定字段,您需要将它们映射到目标表字段:
Kafka消息字段说明:
| 源字段名称 | 字段类型 | 说明 |
|---|---|---|
| lcz_id | VARCHAR | 系统生成的唯一标识。 |
| lcz_key | VARCHAR | Kafka消息的Key。 |
| lcz_value | VARCHAR | 消息体内容,例如JSON格式的原始数据。通常这是您最需要处理的业务数据。 |
| lcz_topic | VARCHAR | 该条消息所属的Topic名称。 |
| lcz_partition | INTEGER | 该条消息所在的分区编号。 |
| lcz_offset | BIGINT | 该条消息在分区内的偏移量。 |
| lcz_timestamp | BIGINT | 该条消息的时间戳。 |
操作提示:通常,您会将 lcz_value(原始消息体)映射到目标表的一个TEXT或VARCHAR类型字段,用于存储原始数据;同时可将 lcz_topic、lcz_timestamp等映射到独立字段,便于后续筛选和统计分析。
2.2.4 (可选)配置“成功后事件”
此功能允许您在每次Kafka消息成功同步到目标表后,自动触发一个离线计算任务,对刚落地的数据进行二次加工(如清洗、聚合、统计分析)。
在“任务控制”步骤,找到并配置 “成功后事件”。
- 添加事件:点击“+”,添加一个离线任务事件。
- 选择任务:从下拉树中选择一个已发布的离线任务。
- 配置参数:
- 系统会列出所选离线任务的所有参数。
- 您可以为参数指定固定值,或者更常见地,引用实时任务同步过来的字段。例如,您可以将离线任务的某个处理参数设置为 ${lcz_id}或 ${lcz_value},实现数据的传递。
- 执行顺序:可添加多个事件,并通过拖拽调整执行顺序。
- 流程解释:实时任务每成功同步一批数据,就会按照您配置的顺序,依次触发对应的离线任务。这非常适合“实时入库 + 实时计算”的场景。
2.3 任务管理与监控
任务发布后,您可以在以下位置进行监控和管理:
2.3.1 实时任务详情页
在实时任务列表点击任务名称,可进入详情页。
基础信息:会显示数据来源为“Kafka: [您选择的Kafka消息名称]”。
实时/历史统计:可以按“消息主题”筛选和查看数据同步速率、延迟等统计信息。
运行日志:可查看详细运行日志,定位问题。
2.3.2 运维中心
实时任务管理:在【运维中心 > 实时任务 > 任务管理】列表中,Kafka任务的“同步类型”显示为“仅增量同步”,“数据来源”会标注为“(Kafka)”。您可以通过“同步类型”筛选条件快速找到所有Kafka任务。
周期任务实例:如果Kafka任务配置了“成功后事件”,被触发执行的离线任务会出现在【运维中心 > 周期任务 > 任务实例】中,其“触发方式”会显示为“事件”,便于您区分和追踪由实时任务触发的离线任务。
最后编辑:fancy 更新时间:2026-03-19 15:07
