2401_85526922 2024-06-07 14:39 采纳率: 0%
浏览 56

云计算与分布式系统练习——hadoop

给定两个字符串S1和S2,S1和S2之间的相似度,记为Jaccard(S1,S2) = |G.S1 ∩ G.S2|/|G.S1∪G.S1|,其中,G.S1为字符串S1的N元模型表示(N-Gram),其使用一个长度为N的窗口从左到右把划分成若干个等长(长度为N)的字符串。

为了避免出现S1字符串长度小于N的情况,可以为S1增加N-1个#作为前缀和N-1个$作为后缀。例如字符串Gorbachev和字符串Gorbechyov的二元模型可以分别表示为{#G, Go, or, rb, ba, ac, ch, he, ev, v$}和{#G, Go, or, rb, be, ec, ch, hy, yo, ov, v$}。因此,Jaccard(“Gorbachev”,“ Gorbechyov”) = 5/15 = 1/3。
云计算与分布式系统练习——hadoop
题目要求
使用Hadoop实现两个文本数据集之间的相似度连接操作。输入:字符串集合R,字符串集合S,相似度阈值θ,N。

输出:
T = {<r,s>|r∈R, s∈S, Jaccard (r,s) >= θ}

说明:相似度阈值设置为0.3,N设置为3,读懂已给出的代码,并补全未给出的代码部分。
####测试说明

输入的字符集格式如下:

2000001 Gabriela Nicolescu
2000002 Hanifa Boucheneb
2000003 Donald P. Cox
2000004 Youssif B. Al-Nashif
2000005 Salim Hariri
按照以下格式输出结果:

(2000006,2000014) 1.0
(2000008,2000054) 0.45

  • 写回答

2条回答 默认 最新

  • 专家-赤兔[在线] 优质创作者: 编程框架技术领域 2024-06-07 14:39
    关注

    引自免费微信小程序:皆我百晓生

    为了完成这个任务,你需要在Hadoop框架下编写MapReduce程序。这里是一个简单的步骤指南和示例代码,用于实现这个任务。首先,我们需要定义三个类:NGramGeneratorMapper, NGramReducer, 和 SimilarityJoinDriver

    Step 1: 定义NGramGeneratorMapper

    这个mapper将读取输入的字符串并生成N-gram。

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.util.StringTokenizer;
    
    public class NGramGeneratorMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
    
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
    
            while (tokenizer.hasMoreTokens()) {
                String token = tokenizer.nextToken();
                for (int i = 0; i < token.length() - 2; i++) {
                    String gram = token.substring(i, i + 3); // N=3 in this case
                    word.set(gram);
                    context.write(word, one);
                }
            }
        }
    }
    

    Step 2: 定义NGramReducer

    这个reducer会收集相同N-gram的所有键,并计算它们的数量。

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class NGramReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private Map<Text, IntWritable> counts = new HashMap<>();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
    
            counts.put(key, new IntWritable(sum));
            for (Map.Entry<Text, IntWritable> entry : counts.entrySet()) {
                context.write(entry.getKey(), entry.getValue());
            }
        }
    }
    

    Step 3: 定义SimilarityJoinDriver

    这个driver类负责启动整个MapReduce作业,并处理最终的相似度计算。

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SimilarityJoinDriver {
        public static void main(String[] args) throws Exception {
            if (args.length != 4) {
                System.err.println("Usage: SimilarityJoinDriver <input_R> <input_S> <output> <theta>");
                System.exit(-1);
            }
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Similarity Join");
            job.setJarByClass(SimilarityJoinDriver.class);
            job.setMapperClass(NGramGeneratorMapper.class);
            job.setCombinerClass(NGramReducer.class);
            job.setReducerClass(NGramReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileInputFormat.addInputPath(job, new Path(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[2]));
    
            job.waitForCompletion(true);
    
            // Read output and calculate similarity
            Configuration config2 = new Configuration();
            Job job2 = Job.getInstance(config2, "Similarity Calculation");
            job2.setJarByClass(SimilarityJoinDriver.class);
            job2.setMapperClass(SimilarityCalculatorMapper.class);
            job2.setReducerClass(SimilarityCalculatorReducer.class);
            job2.setOutputKeyClass(Text.class);
            job2.setOutputValueClass(DoubleWritable.class);
    
            FileInputFormat.addInputPath(job2, new Path(args[2]));
            FileOutputFormat.setOutputPath(job2, new Path(args[3]));
    
            job2.setNumReduceTasks(1); // Use a single reducer to get the result
            job2.waitForCompletion(true);
        }
    }
    

    Step 4: 定义SimilarityCalculatorMapper 和 SimilarityCalculatorReducer

    这两个类用于计算两个文本集的相似度。

    // SimilarityCalculatorMapper
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class SimilarityCalculatorMapper extends Mapper<Text, IntWritable, Text, DoubleWritable> {
        @Override
        protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
            context.write(key, new DoubleWritable(value.get()));
        }
    }
    
    // SimilarityCalculatorReducer
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Set;
    
    public class SimilarityCalculatorReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        private double theta = 0.3; // Set your threshold here
        private Set<String> grams = new HashSet<>();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            theta = Double.parseDouble(context.getConfiguration().get("similarity.threshold"));
        }
    
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double total = 0;
            for (DoubleWritable value : values) {
                total += value.get();
            }
    
            grams.clear();
            for (Text input : context.getCacheFiles()) {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(conf);
                BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(input)));
                String line;
                while ((line = br.readLine()) != null) {
                    grams.add(line);
                }
                br.close();
            }
    
            double intersection = 0;
            for (String gram : grams) {
                if (key.toString().equals(gram)) {
                    intersection++;
                }
            }
    
            double union = total + grams.size() - intersection;
            double jaccard = intersection / union;
            if (jaccard >= theta) {
                context.write(key, new DoubleWritable(jaccard));
            }
        }
    }
    

    确保你在运行这些代码之前已经设置了正确的环境变量,并且你的Hadoop集群是可用的。这只是一个基本的实现,可能需要根据你的具体需求进行调整。如果你遇到任何问题,可以随时向我询问。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 6月7日