Kafka
本节介绍如何在实时计算中配置Kafka数据源。
操作步骤
1.登陆数栈DTinsight,进入 实时计算 模块。
2.进入数据源,单击 新增数据源。

3.在 新增数据源 弹窗中,选择数据类型为 Kafka。
4.填写Kafka数据源的各配置项。

5.点击 测试连通性。
6.测试连通性通过后,点击 确定,即可完成Kafka数据源的配置。
配置 | 说明 | ||
---|---|---|---|
数据源名称 |
只能由中文、字母、数字和下划线组成,长度无限制。 |
||
数据源描述 |
对数据源进行简单描述,长度无限制。 |
||
集群地址 |
Kafka数据库中集群地址,格式要求如下: 示例:IP1:Port,IP2:Port |
||
broker地址 |
Kafka中具体brocker的地址,选填,格式如下: 示例:IP1:Port,IP2:Port |
||
Kerberos认证 |
当用户开启Kerberos认证后,需要手动将 xxx.keytab、krb5.config文件打包为zip文件进行上传。
|
Kafka 数据格式
在FlinkSQL任务开发中,平台支持 Json
, CSV
,Avro
三种格式的Kafka数据;
当使用 Avro
数据格式时,用户需要额外在 Schema
参数中 对 NAME
TYPE
FIELDS
进行声明。
Kafka自定义参数配置
在FLinkSQL任务开发中,使用Kafka作为源表、结果表时可进行自定义参数配置。

平台支持了部分官方Kafka参数,具体参数说明可参考 Kafka官方文档。
其中已支持的Kafka参数如下:
###
kafka.consumer.id
kafka.socket.timeout.ms
kafka.fetch.message.max.bytes
kafka.num.consumer.fetchers
kafka.auto.commit.enable
kafka.auto.commit.interval.ms
kafka.queued.max.message.chunks
kafka.rebalance.max.retries
kafka.fetch.min.bytes
kafka.fetch.wait.max.ms
kafka.rebalance.backoff.ms
kafka.refresh.leader.backoff.ms
kafka.consumer.timeout.ms
kafka.exclude.internal.topics
kafka.partition.assignment.strategy
kafka.client.id
kafka.zookeeper.session.timeout.ms
kafka.zookeeper.connection.timeout.ms
kafka.zookeeper.sync.time.ms
kafka.offsets.storage
kafka.offsets.channel.backoff.ms
kafka.offsets.channel.socket.timeout.ms
kafka.offsets.commit.max.retries
kafka.dual.commit.enabled
kafka.partition.assignment.strategy
kafka.socket.receive.buffer.bytes
kafka.fetch.min.bytes
###kerberos认证相关参数
kafka.security.protocal
kafka.sasl.mechanism
kafka.sasl.kerberos.service.name