package com.dtstack.flink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Properties;
public class FlinkDemo {
private static final Logger LOG = LoggerFactory.getLogger(FlinkDemo.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1. kafka connect
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","localhost:9092");
prop.setProperty("group.id", "wordCount");
JSONKeyValueDeserializationSchema jsonDeserialization = new JSONKeyValueDeserializationSchema(true);
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("input", jsonDeserialization, prop);
DataStreamSource dataSource = env.addSource(kafkaConsumer);
// 资源文件会以shipfile形式上传
// 资源引用文件会放置在container当前目录下
String resourceFilePath = "hdfs.keytab";//需要修改为对应的附加资源名称
DataStream wordDataStream = dataSource.flatMap(new FlatMapFunction<Object, Tuple2<String, Integer>>() {
@Override
public void flatMap(Object value, Collector out) throws Exception {
// 文件在算子函数中使用
File file = new File(resourceFilePath);
LOG.info("resource file exists: " + file.exists());
LOG.info("resource file path: " + file.getAbsolutePath());
ObjectNode objectNode = (ObjectNode) value;
String word = objectNode.get("value").get("word").asText();
Tuple2<String, Integer> t = new Tuple2(word, 1);
out.collect(t);
}
});
DataStream sumStream = wordDataStream.keyBy(0).sum(1);
sumStream.print();
env.execute("flink-demo");
}
}