当前位置:网站首页>MapReduce项目案例1
MapReduce项目案例1
2022-06-28 11:31:00 【一个正在努力的菜鸡】
统计单词数
1.输入内容
hello world i am teacher
hello world i am teacher
hello world i am teacher
hello world i am teacher
hello world
hello world
2.输出内容
am 4
hello 6
i 4
teacher 4
world 6
3.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
/** * map映射器:将输入<字节偏移量,一行文本>切分成<单词,次数> * map之前有一步split,将文本切分成<字节偏移量,一行文本> */
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
//<前两位表示输入类型<偏移量,一行文本>,后两位表示输出类型<单词,数字>>
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/** * 前两个参是输入类型 * * @param key 偏移量 * @param value 一行文本,Text类型可序列化,可比较(WritableComparable接口) * @param context hadoop运行容器,可取出运行时的环境变量 * @throws IOException * @throws InterruptedException */
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
StringTokenizer itr = new StringTokenizer(value.toString());//根据自然分隔符分割
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());//写入文本对象
context.write(word, one);//保存出去(单词,数字)
}
}
}
/** * combiner(单节点合并)和reduce(多节点数据合并)都是对相同键的数据进行规约 * <前两个泛型表示规约的输入数据来源于map的输出,后两个是规约后的单词与数字> */
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
/** * @param key 单词 * @param values 相同单词对应出现次数的集合 * 类中泛型约束是IntWritable,为什么方法上是Iterable<IntWritable>? * 因为在统计之前会相同的键做成列表word [1,1,1],然后在规约word 3 * @param context * @throws IOException * @throws InterruptedException */
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
//对一个键
sum += val.get();//求和规约
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
Job job = Job.getInstance(conf, "word count");//名字
job.setJarByClass(WordCount.class);//上线的jar
job.setMapperClass(TokenizerMapper.class);//mapper
job.setCombinerClass(IntSumReducer.class);//combine:合并一个节点
job.setReducerClass(IntSumReducer.class);//reduce:合并不同节点
job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
job.setOutputValueClass(IntWritable.class);//输出值的类型,与上面一致
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
/* FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
去除重复项
1.输入内容
11
11
11
12
13
2.输出内容
11
12
13
3.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class RemoveRepeat {
/** * map映射器:将输入<字节偏移量,一行文本>切分成<单词,次数> * map之前有一步split,将文本切分成<字节偏移量,一行文本> */
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
context.write(new Text(value), new Text(""));
}
}
/** * map后的数据 * 11 "" * 11 "" * 11 "" * 12 "" * 底层有一次合并,传给reduce的数据为 * 11 ["","",""] * 12 [""] */
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
Job job = Job.getInstance(conf, "remove repeat");//名字
job.setJarByClass(RemoveRepeat.class);//上线的jar
job.setMapperClass(MyMapper.class);//mapper
job.setCombinerClass(MyReducer.class);//combine:合并一个节点
job.setReducerClass(MyReducer.class);//reduce:合并不同节点
job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
/*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
去重并两种方式输出序号
1.输入内容
6
16
8
12
5
7
6
6
2.输出内容
1 1 5
2 2 6
3 5 7
4 6 8
5 7 12
6 8 16
3.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Order {
public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
int va = Integer.parseInt(value.toString());
context.write(new IntWritable(va), new Text("1"));
}
}
/** * map后的数据,根据key会自动排号序 * 5 "" * 6 "" * 6 "" * 6 "" * 7 "" * 8 "" * 12 "" * 16 "" * 底层有一次合并,传给reduce的数据为 * 5 [""] * 6 ["","",""] * 7 [""] * 8 [""] * 12 [""] * 16 [""] */
public static class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
private int num1 = 1;//负责连续序号
private int num2 = 1;//负责跳跃序号
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(new IntWritable(num1), new Text(num2 + "\t" + key.get()));
for (Text val : values) {
num2++;
}
num1++;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
Job job = Job.getInstance(conf, "order");//名字
job.setJarByClass(Order.class);//上线的jar
job.setMapperClass(MyMapper.class);//mapper
/** * 单节点不能合并,若合并会变成 如下 * combine完了之后的数据,也就是reduce接收的数据 * 1 1 5 * 2 2 6 * 3 5 7 * 4 6 8 * 5 7 12 * 6 8 16 * 然乎reduce又会执行一次MyReduce,但是传入的结果如上,此时key已经发生了变化,按照上边算法不会再合并了****** * 1 1 1 * 2 2 2 * 3 3 3 * 4 4 4 * 5 5 5 * 6 6 6 */
//job.setCombinerClass(MyReducer.class);//combine:合并一个节点
job.setReducerClass(MyReducer.class);//reduce:合并不同节点
job.setOutputKeyClass(IntWritable.class);//输出键的类型,与上面一致
job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
/*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
同一个学生的平均值
1.输入内容
张三 98
李四 94
王五 89
张三 86
李四 92
王五 86
张三 82
李四 90
2.输出内容
张三 88.66666666666667
李四 89.5
王五 89.54166666666667
3.代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class Average {
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
StringTokenizer tokenizer = new StringTokenizer(value.toString());//根据自然分割符分割
while (tokenizer.hasMoreTokens()) {
context.write(new Text(tokenizer.nextToken()), new Text(tokenizer.nextToken()));
}
}
}
/** * map后的数据,根据key会自动排号序 * 张三 98 * 李四 94 * 王五 89 * 张三 86 * 李四 92 * 王五 86 * 张三 82 * 李四 90 * 底层有一次合并,传给reduce的数据为 * 张三 [98,86,82] * 李四 [94,92,90] * 王五 [89,86] */
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
private int count = 0;//记录该人有多少成绩
private double sum = 0;//记录成绩总和
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
sum += Double.parseDouble((val.toString()));
count++;
}
context.write(key, new Text(String.valueOf(sum / count)));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
Job job = Job.getInstance(conf, "average");//名字
job.setJarByClass(Average.class);//上线的jar
job.setMapperClass(MyMapper.class);//mapper
job.setCombinerClass(MyReducer.class);//combine:合并一个节点
job.setReducerClass(MyReducer.class);//reduce:合并不同节点
job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
job.setOutputValueClass(Text.class);//输出值的类型,与上面一致
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
/*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
边栏推荐
- Dongyuhui, New Oriental and Phoenix Satellite TV
- 科研丨Web of Science检索技巧
- 2022 开源软件安全状况报告:超41%的企业对开源安全没有足够的信心
- How to distinguish and define DQL, DML, DDL and DCL in SQL
- Xshell and xftp tutorial
- Day31 JS notes DOM 2021.09.26
- Industry analysis - quick intercom, building intercom
- 【sciter】: sciter-fs模块扫描文件API的使用及其注意细节
- 功能真花哨,价格真便宜!长安全新SUV真实力到底怎样?
- Ali three sides: what is the difference between using on or where in the left join associated table and the condition
猜你喜欢

2018 joint examination of nine provinces & Merging of line segment trees

js中this的默认指向及如何修改指向 2021.11.09

Fruit FL studio/cubase/studio one music host software comparison

如临现场的视觉感染力,NBA决赛直播还能这样看?

Redis6 一:Nosql引入、Redis可以解决什么问题?

Simulation of the Saier lottery to seek expectation

Day37 JS note motion function 2021.10.11

day33 js笔记 事件(下)2021.09.28

太阳能无线LED显示屏的特点

Day31 JS notes DOM 2021.09.26
随机推荐
mysql-.sql文件钓鱼上线
Making and using of dynamic library (shared library)
Graduated
2018 joint examination of nine provinces & Merging of line segment trees
Array method in JS 2021.09.18
Tidb v6.0.0 (DMR): initial test of cache table - tidb Book rush
水果FL Studio/Cubase/Studio one音乐宿主软件对比
New listing of operation light 3.0 - a sincere work of self subversion across the times!
Analyze whether there is duplicate data in the list and repeat it several times
Day39 prototype chain and page Fireworks Effect 2021.10.13
js中this的默认指向及如何修改指向 2021.11.09
day24 js笔记 2021.09.15
來吧元宇宙,果然這熱度一時半會兒過不去了
无法重新声明块范围变量
Day37 JS note motion function 2021.10.11
MySql5.7添加新用户
It is safer for individuals to choose which securities company to open an account for buying floor funds
Share the easy-to-use fastadmin open source system - practical part
Adding a new user in MySQL 5.7
day31 js笔记 DOM下 2021.09.26