1. 完成的场景
在很多⼤数据场景下,要求数据形成数据流的形式进⾏计算和存储。上篇博客介绍了Flink消费Kafka数据实现Wordcount计算,这篇博客需要完成的是将实时计算的结果写到redis。当kafka从其他端获取数据⽴刻到Flink计算,Flink计算完后结果写到Redis,整个过程就像流⽔⼀样形成了数据流的处理
2. 代码
添加第三⽅依赖
注意这⾥的版本最好统⼀选1.4.0,flink-redis的版本最好选1.1.5,⽤低版本或其他版本会遇到包冲突或者不同包的同⼀类不同等逻辑或者第版本有些类没有等java通⽤的⼀些问题逻辑代码
package com.scn;
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import org.apache.flink.util.Collector;import java.util.Properties;
public class FilnkCostKafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty(\"bootstrap.servers\ properties.setProperty(\"zookeeper.connect\ properties.setProperty(\"group.id\
FlinkKafkaConsumer09 DataStream FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(\"127.0.0.1\").build(); //实例化RedisSink,并通过flink的addSink的⽅式将flink计算的结果插⼊到redis counts.addSink(new RedisSink public static final class LineSplitter implements FlatMapFunction public void flatMap(String value, Collector out.collect(new Tuple2 } } } //指定Redis key并将flink数据类型映射到Redis数据类型 public static final class RedisExampleMapper implements RedisMapper return new RedisCommandDescription(RedisCommand.HSET, \"flink\"); } public String getKeyFromData(Tuple2 public String getValueFromData(Tuple2 编写⼀个测试类 package com.scn; import redis.clients.jedis.Jedis; public class RedisTest { public static void main(String args[]){ Jedis jedis=new Jedis(\"127.0.0.1\"); System.out.println(\"Server is running: \" + jedis.ping()); System.out.println(\"result:\"+jedis.hgetAll(\"flink\")); }} 3. 测试 启动Redis服务 redis-server 执⾏FilnkCostKafka main⽅法没有跑出异常信息证明启动没有问题在kafka producer端输出⼀些数据 执⾏测试类RedisTest的main⽅法会输出: Server is running: PONG result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2} 可以看到数据已经流到Redis 因篇幅问题不能全部显示,请点此查看更多更全内容