利用flink统计消息回复情况
发表于:2024-11-24 作者:热门IT资讯网编辑
编辑最后更新 2024年11月24日,其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下public static void main(S
其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下
public static void main(String[] arg) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableSysoutLogging();//开启Sysout打日志 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置窗口的时间单位为process time Properties props = new Properties(); props.put("bootstrap.servers", "kafkaip:9092"); props.put("group.id", "metric-group4"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); //value 反序列化 DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "im-message-topic3", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); DataStream bean3DataStream = dataStreamSource.map(new MapFunction() { @Override public Message map(String value) throws Exception { logger.info("receive msg:"+value); JSONObject jsonObject =JSONObject.parseObject(value); Message s= new Message( jsonObject.getString("sessionId"), jsonObject.getString("fromUid"), jsonObject.getString("toUid"), jsonObject.getString("chatType"), jsonObject.getString("type"), jsonObject.getString("msgId"), jsonObject.getString("msg"), jsonObject.getLong("timestampSend") ); return s; } }); //设置水印,并过滤数据 DataStream bean3DataStreamWithAssignTime = bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction() { @Override public void apply(TimeWindow window, Iterable values, Collector out) throws Exception { for (Message t: values) { logger.info("window start time:"+new Date(window.getStart()).toString()); logger.info("real time:"+new Date(t.getTimestampSend()).toString()); if(t.getTimestampSend() appendStream =tableEnv.toAppendStream(tb3, Row.class);// appendStream.addSink(new Sink()); //对过滤后的数据,使用正则匹配数据 Table tb2 = tableEnv.sqlQuery( "SELECT " + " * " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.timestampSend as answerTime, "+ "LAST(e1.timestampSend) as customer_event_time, " + "e2.fromUid as empUid, " + "e1.timestampSend as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1+ e2+?) " + "DEFINE " + "e1 as e1.type = 'yonghu', " + "e2 as e2.type = 'guanjia' " + ")"+ "" ); DataStream appendStream2 =tableEnv.toAppendStream(tb2, Row.class); appendStream2.addSink(new Sink2()); env.execute("msg v5"); } public static class TruckTimestamp extends AscendingTimestampExtractor { private static final long serialVersionUID = 1L; @Override public long extractAscendingTimestamp(Message element) { return element.getTimestampSend(); } } public static class Sink implements SinkFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"orinal time:"+value.toString()); } } public static class Sink2 implements SinkFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"new time:"+value.toString()); } }