1 概述

1.1 版本

服务器版本
v8.0.0及之后版本

1.2 功能简介

Kafka实时任务功能支持将从指定Kafka服务实时获取的数据,自动同步到目标数据库中,并可配置成功处理后的二次数据处理流程。

2 操作步骤

2.1 前置配置:Kafka消息

在创建Kafka实时任务前,您需要先在平台门户配置Kafka服务信息,供任务创建时引用。

2.1.1 入口

登录系统管理员账号,进入【系统维护》Kafka消息】菜单。

2.2.2 Kafka消息配置

点击“新建”按钮将侧推弹窗新建界面,如下图:

填写以下必要信息:

  • 显示名称:为这个Kafka服务起一个便于识别的名称,如“生产环境Kafka集群”。(必填,当前列表内唯一)
  • 服务地址:填入Kafka服务的连接地址,目前仅支持单服务,格式为 IP:PORT。(必填)
  • 认证方式:根据您的Kafka服务安全设置进行选择:
    1)无认证:无需填写额外信息。
    2)用户名密码:需填写对应的用户名和密码。
  • 字符集:指定数据编码,默认值为 UTF-8,通常无需修改。
    配置完成后,此Kafka消息即可在数据开发平台中供所有用户使用。

2.2 创建Kafka实时任务

完成前置配置后,即可在数据开发平台创建实时同步任务。

2.2.1 新建任务

进入【数据开发平台》实时开发】,点击+“实时同步任务”。

2.2.2 配置数据来源

在“任务配置”的第一步,将数据来源类型由默认的“Mysql”切换为 “Kafka”。

选择“Kafka”类型后,数据来源配置区会变化,需填写:

  • Kafka消息:从下拉列表中选择您在门户预先配置好的的Kafka服务。
  • 消息主题:输入需要订阅的一个或多个Kafka主题(Topic)。多个Topic请用英文逗号分隔。

    注:输入的Topic名称一定要在对应Kafka服务中是存在的,若不存在,则实时任务执行后将会被异常中止。

系统行为说明:
任务同步类型固定为“仅增量同步”,即持续监听新消息。
任务启动时会从每个分区最早的偏移量(offset)开始消费。

2.2.3 配置数据去向

此步骤用于定义Kafka消息同步到哪个数据库的哪张表。

选择目标:与常规任务类似,选择目标数据源和目标表。
字段映射:这是Kafka任务的核心配置。系统会将Kafka消息解析为固定的几个字段,您需要将这些字段映射到目标表的列上,目标表建议选择“自动建表”,输入表名后,点击“手动建表”点击确定即可。

注:目标表物理主键不能为空,以此保证写入数据的唯一性。

左侧(源):显示您在上移步骤【数据来源】的“消息主题”中填写的Topic列表。您可以选择特定的Topic进行映射配置。
右侧(映射):Kafka消息包含以下固定字段,您需要将它们映射到目标表字段:

Kafka消息字段说明:

源字段名称 字段类型 说明
lcz_id VARCHAR 系统生成的唯一标识。
lcz_key VARCHAR Kafka消息的Key。
lcz_value VARCHAR 消息体内容,例如JSON格式的原始数据。通常这是您最需要处理的业务数据。
lcz_topic VARCHAR 该条消息所属的Topic名称。
lcz_partition INTEGER 该条消息所在的分区编号。
lcz_offset BIGINT 该条消息在分区内的偏移量。
lcz_timestamp BIGINT 该条消息的时间戳。

操作提示:通常,您会将 lcz_value(原始消息体)映射到目标表的一个TEXT或VARCHAR类型字段,用于存储原始数据;同时可将 lcz_topic、lcz_timestamp等映射到独立字段,便于后续筛选和统计分析。

2.2.4 (可选)配置“成功后事件”

此功能允许您在每次Kafka消息成功同步到目标表后,自动触发一个离线计算任务,对刚落地的数据进行二次加工(如清洗、聚合、统计分析)。
在“任务控制”步骤,找到并配置 “成功后事件”。

  • 添加事件:点击“+”,添加一个离线任务事件。
  • 选择任务:从下拉树中选择一个已发布的离线任务。
  • 配置参数:
  • 系统会列出所选离线任务的所有参数。
  • 您可以为参数指定固定值,或者更常见地,引用实时任务同步过来的字段。例如,您可以将离线任务的某个处理参数设置为 ${lcz_id}或 ${lcz_value},实现数据的传递。
  • 执行顺序:可添加多个事件,并通过拖拽调整执行顺序。
  • 流程解释:实时任务每成功同步一批数据,就会按照您配置的顺序,依次触发对应的离线任务。这非常适合“实时入库 + 实时计算”的场景。

2.3 任务管理与监控

任务发布后,您可以在以下位置进行监控和管理:

2.3.1 实时任务详情页

在实时任务列表点击任务名称,可进入详情页。
基础信息:会显示数据来源为“Kafka: [您选择的Kafka消息名称]”。
实时/历史统计:可以按“消息主题”筛选和查看数据同步速率、延迟等统计信息。

运行日志:可查看详细运行日志,定位问题。

2.3.2 运维中心

实时任务管理:在【运维中心 > 实时任务 > 任务管理】列表中,Kafka任务的“同步类型”显示为“仅增量同步”,“数据来源”会标注为“(Kafka)”。您可以通过“同步类型”筛选条件快速找到所有Kafka任务。

周期任务实例:如果Kafka任务配置了“成功后事件”,被触发执行的离线任务会出现在【运维中心 > 周期任务 > 任务实例】中,其“触发方式”会显示为“事件”,便于您区分和追踪由实时任务触发的离线任务。

作者:fancy  创建时间:2026-03-18 14:19
最后编辑:fancy  更新时间:2026-03-19 15:07