当前位置:网站首页>MapReduce项目案例1
MapReduce项目案例1
2022-06-28 11:31:00 【一个正在努力的菜鸡】
统计单词数
1.输入内容
hello world i am teacher
hello world i am teacher
hello world i am teacher
hello world i am teacher
hello world
hello world
2.输出内容
am 4
hello 6
i 4
teacher 4
world 6
3.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
/** * map映射器:将输入<字节偏移量,一行文本>切分成<单词,次数> * map之前有一步split,将文本切分成<字节偏移量,一行文本> */
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
//<前两位表示输入类型<偏移量,一行文本>,后两位表示输出类型<单词,数字>>
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/** * 前两个参是输入类型 * * @param key 偏移量 * @param value 一行文本,Text类型可序列化,可比较(WritableComparable接口) * @param context hadoop运行容器,可取出运行时的环境变量 * @throws IOException * @throws InterruptedException */
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
StringTokenizer itr = new StringTokenizer(value.toString());//根据自然分隔符分割
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());//写入文本对象
context.write(word, one);//保存出去(单词,数字)
}
}
}
/** * combiner(单节点合并)和reduce(多节点数据合并)都是对相同键的数据进行规约 * <前两个泛型表示规约的输入数据来源于map的输出,后两个是规约后的单词与数字> */
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
/** * @param key 单词 * @param values 相同单词对应出现次数的集合 * 类中泛型约束是IntWritable,为什么方法上是Iterable<IntWritable>? * 因为在统计之前会相同的键做成列表word [1,1,1],然后在规约word 3 * @param context * @throws IOException * @throws InterruptedException */
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
//对一个键
sum += val.get();//求和规约
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
Job job = Job.getInstance(conf, "word count");//名字
job.setJarByClass(WordCount.class);//上线的jar
job.setMapperClass(TokenizerMapper.class);//mapper
job.setCombinerClass(IntSumReducer.class);//combine:合并一个节点
job.setReducerClass(IntSumReducer.class);//reduce:合并不同节点
job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
job.setOutputValueClass(IntWritable.class);//输出值的类型,与上面一致
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
/* FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
去除重复项
1.输入内容
11
11
11
12
13
2.输出内容
11
12
13
3.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class RemoveRepeat {
/** * map映射器:将输入<字节偏移量,一行文本>切分成<单词,次数> * map之前有一步split,将文本切分成<字节偏移量,一行文本> */
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
context.write(new Text(value), new Text(""));
}
}
/** * map后的数据 * 11 "" * 11 "" * 11 "" * 12 "" * 底层有一次合并,传给reduce的数据为 * 11 ["","",""] * 12 [""] */
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
Job job = Job.getInstance(conf, "remove repeat");//名字
job.setJarByClass(RemoveRepeat.class);//上线的jar
job.setMapperClass(MyMapper.class);//mapper
job.setCombinerClass(MyReducer.class);//combine:合并一个节点
job.setReducerClass(MyReducer.class);//reduce:合并不同节点
job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
/*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
去重并两种方式输出序号
1.输入内容
6
16
8
12
5
7
6
6
2.输出内容
1 1 5
2 2 6
3 5 7
4 6 8
5 7 12
6 8 16
3.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Order {
public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
int va = Integer.parseInt(value.toString());
context.write(new IntWritable(va), new Text("1"));
}
}
/** * map后的数据,根据key会自动排号序 * 5 "" * 6 "" * 6 "" * 6 "" * 7 "" * 8 "" * 12 "" * 16 "" * 底层有一次合并,传给reduce的数据为 * 5 [""] * 6 ["","",""] * 7 [""] * 8 [""] * 12 [""] * 16 [""] */
public static class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
private int num1 = 1;//负责连续序号
private int num2 = 1;//负责跳跃序号
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(new IntWritable(num1), new Text(num2 + "\t" + key.get()));
for (Text val : values) {
num2++;
}
num1++;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
Job job = Job.getInstance(conf, "order");//名字
job.setJarByClass(Order.class);//上线的jar
job.setMapperClass(MyMapper.class);//mapper
/** * 单节点不能合并,若合并会变成 如下 * combine完了之后的数据,也就是reduce接收的数据 * 1 1 5 * 2 2 6 * 3 5 7 * 4 6 8 * 5 7 12 * 6 8 16 * 然乎reduce又会执行一次MyReduce,但是传入的结果如上,此时key已经发生了变化,按照上边算法不会再合并了****** * 1 1 1 * 2 2 2 * 3 3 3 * 4 4 4 * 5 5 5 * 6 6 6 */
//job.setCombinerClass(MyReducer.class);//combine:合并一个节点
job.setReducerClass(MyReducer.class);//reduce:合并不同节点
job.setOutputKeyClass(IntWritable.class);//输出键的类型,与上面一致
job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
/*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
同一个学生的平均值
1.输入内容
张三 98
李四 94
王五 89
张三 86
李四 92
王五 86
张三 82
李四 90
2.输出内容
张三 88.66666666666667
李四 89.5
王五 89.54166666666667
3.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class Average {
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
StringTokenizer tokenizer = new StringTokenizer(value.toString());//根据自然分割符分割
while (tokenizer.hasMoreTokens()) {
context.write(new Text(tokenizer.nextToken()), new Text(tokenizer.nextToken()));
}
}
}
/** * map后的数据,根据key会自动排号序 * 张三 98 * 李四 94 * 王五 89 * 张三 86 * 李四 92 * 王五 86 * 张三 82 * 李四 90 * 底层有一次合并,传给reduce的数据为 * 张三 [98,86,82] * 李四 [94,92,90] * 王五 [89,86] */
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
private int count = 0;//记录该人有多少成绩
private double sum = 0;//记录成绩总和
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
sum += Double.parseDouble((val.toString()));
count++;
}
context.write(key, new Text(String.valueOf(sum / count)));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
Job job = Job.getInstance(conf, "average");//名字
job.setJarByClass(Average.class);//上线的jar
job.setMapperClass(MyMapper.class);//mapper
job.setCombinerClass(MyReducer.class);//combine:合并一个节点
job.setReducerClass(MyReducer.class);//reduce:合并不同节点
job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
/*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
边栏推荐
- Deployment and optimization of vsftpd service
- 李宏毅《机器学习》丨7. Conclusion(总结)
- Redis6 1: what problems can be solved by the introduction of NoSQL and redis?
- 基于验证码识别的机器学习项目captcha_trainer操作实践
- Ali three sides: what is the difference between using on or where in the left join associated table and the condition
- day28 严格模式、字符串 js 2021.09.22
- Making and using of static library
- IO stream of file and Base64
- Setinterval, setTimeout and requestanimationframe
- TiDB v6.0.0 (DMR) :缓存表初试丨TiDB Book Rush
猜你喜欢
day37 js笔记 运动函数 2021.10.11
Unity屏幕截图功能
day33 js笔记 事件(下)2021.09.28
Cannot redeclare block range variables
Fancy features and cheap prices! What is the true strength of Changan's new SUV?
Web page tips this site is unsafe solution
无法重新声明块范围变量
Making and using of static library
QML control type: tabbar
New listing of operation light 3.0 - a sincere work of self subversion across the times!
随机推荐
Day34 JS notes regular expression 2021.09.29
SQL必需掌握的100个重要知识点:检索数据
5. Sum of N numbers
Unity屏幕截图功能
零基础自学SQL课程 | IF函数
SoapUI rookie tutorial
Pre parsing, recursive functions and events in day25 JS 2021.09.16
Get current system date
Day31 JS notes DOM 2021.09.26
1. print hourglass
Contract quantitative trading system development | contract quantitative app development (ready-made cases)
Scientific research - web of science retrieval skills
100 important knowledge points that SQL must master: retrieving data
毕业了
Data analysis learning notes
[no title] the virtual machine vmnet0 cannot be found and an error is reported: there is no un bridged host network adapter
2022 open source software security status report: over 41% of enterprises do not have enough confidence in open source security
When an entity is converted to JSON, the field with null value is lost
Practice and Thinking on the architecture of a set of 100000 TPS im integrated message system
Chapter 2 do you remember the point, line and surface (2)