原创

如何编写一个MR程序


如何编写一个MR程序

0.依赖导入

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

1.编写入口类

public class WordcountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		// 1 获取配置信息以及获取job对象
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 关联本Driver程序的jar
		job.setJarByClass(WordcountDriver.class);

		// 3 关联Mapper和Reducer的jar
		job.setMapperClass(WordcountMapper.class);
		job.setReducerClass(WordcountReducer.class);

		// 4 设置Mapper输出的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 5 设置最终输出kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 6 设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 提交job
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

2.编写Map类

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		// 1 获取一行
		String line = value.toString();
		// 2 切割
		String[] words = line.split(" ");
		// 3 输出
		for (String word : words) {		
			k.set(word);
			context.write(k, v);
        //(红桃A,1)
		}
	}
}

3.编写Reduce类

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

int sum;
IntWritable v = new IntWritable();

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		// 1 累加求和
		sum = 0;
		for (IntWritable count : values) {
			sum += count.get();
		}
		// 2 输出
         v.set(sum);
		context.write(key,v);
	}
}

4.打包运行

//hadoop jar 包名 全类名 输入路径 输出路径
[root@hadoop test]# hadoop jar mapreduce.jar WordcountDriver wcinput/ wcoutput

5.查看结果

原文件

黑桃A 红桃3
黑桃A
黑桃K
红桃8
黑桃A 红桃3
黑桃A
......

输出结果

红桃3	114048
红桃4	38016
红桃6	38016
红桃8	76032
黑桃A	139392
黑桃K	76032
hadoop
  • 作者:刘智豪(联系作者)
  • 发表时间:2022-08-11
  • 版权声明:自由转载-非商用-保持署名(创意共享3.0许可证)
  • 公众号转载:请在文末添加作者公众号二维码
  • 评论