FlinkJardemo

package com.dtstack.flink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Properties;

public class FlinkDemo {

    private static final Logger LOG = LoggerFactory.getLogger(FlinkDemo.class);

    public static void main(String[] args) throws Exception {


        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //1. kafka connect
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","localhost:9092");
        prop.setProperty("group.id", "wordCount");

        JSONKeyValueDeserializationSchema jsonDeserialization = new JSONKeyValueDeserializationSchema(true);
        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("input", jsonDeserialization, prop);
        DataStreamSource dataSource = env.addSource(kafkaConsumer);

        // 资源文件会以shipfile形式上传
        // 资源引用文件会放置在container当前目录下
        String resourceFilePath = "hdfs.keytab";//需要修改为对应的附加资源名称
        DataStream wordDataStream = dataSource.flatMap(new FlatMapFunction<Object, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(Object value, Collector out) throws Exception {

                // 文件在算子函数中使用
                File file = new File(resourceFilePath);
                LOG.info("resource file exists: " + file.exists());
                LOG.info("resource file path: " + file.getAbsolutePath());

                ObjectNode objectNode = (ObjectNode) value;
                String word = objectNode.get("value").get("word").asText();
                Tuple2<String, Integer> t = new Tuple2(word, 1);
                out.collect(t);
            }
        });

        DataStream sumStream = wordDataStream.keyBy(0).sum(1);
        sumStream.print();

        env.execute("flink-demo");
    }


}