Kafka

本节介绍如何在实时计算中配置Kafka数据源。

操作步骤

1.登陆数栈DTinsight,进入 实时计算 模块。
2.进入数据源,单击 新增数据源

3 4 2021 17 07 04 PM

3.在 新增数据源 弹窗中,选择数据类型为 Kafka。
4.填写Kafka数据源的各配置项。

3 4 2021 17 06 37 PM

5.点击 测试连通性
6.测试连通性通过后,点击 确定,即可完成Kafka数据源的配置。

配置 说明

数据源名称

只能由中文、字母、数字和下划线组成,长度无限制。

数据源描述

对数据源进行简单描述,长度无限制。

集群地址

Kafka数据库中集群地址,格式要求如下: 示例:IP1:Port,IP2:Port

broker地址

Kafka中具体brocker的地址,选填,格式如下: 示例:IP1:Port,IP2:Port

Kerberos认证

当用户开启Kerberos认证后,需要手动将 xxx.keytab、krb5.config文件打包为zip文件进行上传。

上传文件前,请在控制台开启SFTP服务。

Kafka 数据格式

在FlinkSQL任务开发中,平台支持 JsonCSVAvro 三种格式的Kafka数据; 当使用 Avro 数据格式时,用户需要额外在 Schema 参数中 对 NAME TYPE FIELDS 进行声明。

Kafka自定义参数配置

在FLinkSQL任务开发中,使用Kafka作为源表、结果表时可进行自定义参数配置。

2 4 2021 14 32 03 PM

平台支持了部分官方Kafka参数,具体参数说明可参考 Kafka官方文档

其中已支持的Kafka参数如下:

###
kafka.consumer.id
kafka.socket.timeout.ms
kafka.fetch.message.max.bytes
kafka.num.consumer.fetchers
kafka.auto.commit.enable
kafka.auto.commit.interval.ms
kafka.queued.max.message.chunks
kafka.rebalance.max.retries
kafka.fetch.min.bytes
kafka.fetch.wait.max.ms
kafka.rebalance.backoff.ms
kafka.refresh.leader.backoff.ms
kafka.consumer.timeout.ms
kafka.exclude.internal.topics
kafka.partition.assignment.strategy
kafka.client.id
kafka.zookeeper.session.timeout.ms
kafka.zookeeper.connection.timeout.ms
kafka.zookeeper.sync.time.ms
kafka.offsets.storage
kafka.offsets.channel.backoff.ms
kafka.offsets.channel.socket.timeout.ms
kafka.offsets.commit.max.retries
kafka.dual.commit.enabled
kafka.partition.assignment.strategy
kafka.socket.receive.buffer.bytes
kafka.fetch.min.bytes

###kerberos认证相关参数
kafka.security.protocal
kafka.sasl.mechanism
kafka.sasl.kerberos.service.name

Kafka序列化器

平台默认使用 String 类型进行序列化,当用户存在额外的序列化方式时,需要手动设定 Key.deserializervalue.deserializer 参数界定。

key.deserializer : org.apache.kafka.common.serialization.StringDeserializer
value.deserializer : org.apache.kafka.common.serialization.StringDeserializer