xfhmbjs 2023-07-28 09:55 采纳率: 0%
浏览 20
已结题

flink,一个小时内,电话号码出现10次的,都要进行输出

在flink 中,要对输入的数据流进行处理。输入的数据流中每行数据有2个字段,电话号码 和 号码出现的时间。
现在要处理数据的逻辑是,一个小时内,电话号码出现10次的,都要进行输出。
这段代码该怎么编写,使用java语言

  • 写回答

18条回答 默认 最新

  • Wlq00001 2023-07-28 10:12
    关注
    
    
    您可以使用Apache Flink的DataStream API来实现对输入数据流的处理。以下是一个示例代码,展示了如何在一个小时内统计电话号码出现次数,并输出出现次数大于等于10次的电话号码:
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class PhoneNumberCount {
    
        public static void main(String[] args) throws Exception {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 创建输入数据流
            DataStream<String> inputDataStream = env.fromElements(
                    "1234567890,2023-07-28 01:00:00",
                    "1234567890,2023-07-28 01:05:00",
                    "2345678901,2023-07-28 01:10:00",
                    "1234567890,2023-07-28 01:15:00",
                    "2345678901,2023-07-28 01:20:00",
                    "1234567890,2023-07-28 01:25:00",
                    "2345678901,2023-07-28 01:30:00",
                    "3456789012,2023-07-28 01:35:00",
                    "1234567890,2023-07-28 01:40:00",
                    "2345678901,2023-07-28 01:45:00",
                    "3456789012,2023-07-28 01:50:00",
                    "1234567890,2023-07-28 01:55:00"
            );
    
            // 转换数据流,提取电话号码并设置时间戳
            DataStream<Tuple2<String, Long>> phoneNumbers = inputDataStream.flatMap(new PhoneNumberExtractor());
    
            // 按照电话号码进行分组,设置滚动窗口(1小时),统计每个电话号码在窗口内出现的次数
            DataStream<Tuple2<String, Integer>> phoneNumberCounts = phoneNumbers
                    .keyBy(0)
                    .countWindow(3600 * 1000)  // 1小时窗口
                    .sum(1);
    
            // 过滤出现次数大于等于10次的电话号码,并输出
            DataStream<Tuple2<String, Integer>> filteredPhoneNumberCounts = phoneNumberCounts
                    .filter(count -> count.f1 >= 10);
    
            filteredPhoneNumberCounts.print();
    
            // 执行任务
            env.execute("PhoneNumberCount");
        }
    
        // 自定义FlatMapFunction,用于从输入数据流中提取电话号码和时间戳
        public static class PhoneNumberExtractor implements FlatMapFunction<String, Tuple2<String, Long>> {
            @Override
            public void flatMap(String input, Collector<Tuple2<String, Long>> collector) throws Exception {
                String[] fields = input.split(",");
                String phoneNumber = fields[0];
                String timeString = fields[1];
    
                SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                Date time = dateFormat.parse(timeString);
                long timestamp = time.getTime();
    
                collector.collect(new Tuple2<>(phoneNumber, timestamp));
            }
        }
    }
    在上述示例代码中,我们首先创建了一个输入数据流inputDataStream,其中包含了电话号码和号码出现的时间。然后,我们通过自定义的PhoneNumberExtractor将数据流转换为Tuple2<String, Long>类型的数据流,其中包含了电话号码和对应的时间戳。
    
    接下来,我们按照电话号码进行分组,并设置了一个滚动窗口(1小时),统计每个电话号码在窗口内出现的次数。然后,我们过滤出现次数大于等于10次的电话号码,并将结果输出。
    
    最后,我们通过调用env.execute("PhoneNumberCount")来执行任务。
    
    请注意,以上示例代码中的时间处理部分仅作演示用途,实际使用时您可能需要根据具体的时间格式和数据源进行相应的调整。
    
    评论

报告相同问题?

问题事件

  • 系统已结题 8月5日
  • 修改了问题 7月28日
  • 创建了问题 7月28日

悬赏问题

  • ¥15 BV260Y用MQTT向阿里云发布主题消息一直错误
  • ¥20 求个正点原子stm32f407开发版的贪吃蛇游戏
  • ¥15 划分vlan后,链路不通了?
  • ¥20 求各位懂行的人,注册表能不能看到usb使用得具体信息,干了什么,传输了什么数据
  • ¥15 Vue3 大型图片数据拖动排序
  • ¥15 Centos / PETGEM
  • ¥15 划分vlan后不通了
  • ¥20 用雷电模拟器安装百达屋apk一直闪退
  • ¥15 算能科技20240506咨询(拒绝大模型回答)
  • ¥15 自适应 AR 模型 参数估计Matlab程序