当前位置:网站首页>MapReduce进阶
MapReduce进阶
2022-07-23 04:18:00 【S_ng】
1.HDFS 数据格式详解
1.1文件格式
面向行:.txt 可切分 .seq 可切分
面向列:.rc 可切分 .orc 可切分
1.2压缩格式
可切分:.lzo 原生 .bz2 原生
不可切分:.gz 原生 .snappy 不是原生
1.3设置输出格式为gzip
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
2. 自定义Partition
2.1自定义reduce数量
yarn jar TlHadoopCore-jar-with-dependencies.jar \
com.examples.WordCountV2 \
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-Dmapred.reduce.tasks=2 \
/tmp/tianliangedu/input /tmp/output
2.2自定义Partition实现
*/
public static class MyHashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
// return key.toString().charAt(0);
}
}指定partition类
job.setPartitionerClass(MyHashPartitioner.class);
脚本调用:
yarn jar TlHadoopCore-jar-with-dependencies.jar \
com.examples.SelfDefinePartitioner \
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-Dmapred.reduce.tasks=2 \
/tmp/input /tmp/output
3. MR应用之读取外部配置文件-Configuration传递
步骤分解
- 实现基于input_filter目录中文件数据的一次排序,即Map和Reduce的读入和归约处理。
- 将本地文件whitelist.txt传给Driver类,读取到该文件内容txtContent
- 将txtContent通过Configuration的set方法传递给map和reduce任务
- 在map任务中通过Configuration对象的get方法获取传递过来的值txtContent
- 将txtContent解析成Set对象,对map任务中的map方法进行过滤输出
- 由于map端已经做了过滤,reduce端将不需任何改变
package com.tianliangedu.core.readconfig; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.log4j.Logger; //启动mr的driver类 public class ConfigSetTransferDriver { public static Logger logger = Logger .getLogger(ConfigSetTransferDriver.class); // map类,实现map函数 public static class LineProcessMapper extends Mapper<Object, Text, Text, IntWritable> { // 暂存每个传过来的词的值,省掉重复申请空间 private Text outputKey = new Text(); private IntWritable outputValue = new IntWritable(); // 过滤whitename的set集合 private Set<String> whiteNameSet = new HashSet<String>(); //每个map任务有且仅会执行一次setup方法,用于初始化map函数执行前的所需参数 @Override protected void setup( Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String whitelistString = conf.get("whitelist"); String[] whiteNameArray = whitelistString.split("\\s"); whiteNameSet.addAll(Arrays.asList(whiteNameArray)); } // 核心map方法的具体实现,逐个<key,value>对去处理 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 通过context对象,将map的输出逐个输出 String tempLine = value.toString(); if (tempLine != null && tempLine.trim().length() > 0) { String[] columnArray = tempLine.split("\\s"); if (whiteNameSet.contains(columnArray[0])) { outputKey.set(columnArray[0]); outputValue.set(Integer.parseInt(columnArray[1])); context.write(outputKey, outputValue); } } } } // reduce类,实现reduce函数 public static class SortReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 加强型for,依次获取迭代器中的每个元素值 for (IntWritable val : values) { // 将计算结果逐条输出 context.write(key, val); } } } //读取一个指定本地路径和文件编码的文件内容,转换成字符串 public static String readFile(String filePath, String fileEncoding) { if (fileEncoding == null) { fileEncoding = System.getProperty("file.encoding"); } File file = new File(filePath); BufferedReader br = null; String line = null; StringBuilder stringBuilder = new StringBuilder(); int lineCounter=0; try { br = new BufferedReader(new InputStreamReader(new FileInputStream( file), fileEncoding)); while ((line = br.readLine()) != null) { if(lineCounter>0){ stringBuilder.append("\n"); } stringBuilder.append(line); lineCounter++; } return stringBuilder.toString(); } catch (Exception e) { logger.info(e.getLocalizedMessage()); } finally { if (br != null) { try { br.close(); } catch (IOException e) { logger.info(e.getLocalizedMessage()); logger.info("关闭IOUtil流时出现错误!"); } } } return null; } //配置文件读取与值传递 public static void readConfigAndTransfer(Configuration conf,String filePath) { //读取本地配置文件 String source = readFile(filePath, "utf-8"); //将配置文件中的值通过conf set的方式传递 到计算节点中 conf.set("whitelist", source); //通过日志打印的方式,将读取到的值,打印出来,如不打印日志,可去除以下代码段 logger.info("whitelist=" + source); } // 启动mr的driver方法 public static void main(String[] args) throws Exception { // 得到集群配置参数 Configuration conf = new Configuration(); // 参数解析器 GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if ((remainingArgs.length < 3)) { System.err .println("Usage: yarn jar jar_path main_class_path -D参数列表 <in> <out>"); System.exit(2); } // 配置参数读取与传递 readConfigAndTransfer(conf,remainingArgs[2]); // 设置到本次的job实例中 Job job = Job.getInstance(conf, "天亮conf直接传参"); // 指定本次执行的主类是WordCount job.setJarByClass(ConfigSetTransferDriver.class); // 指定map类 job.setMapperClass(LineProcessMapper.class); // 指定reducer类 job.setReducerClass(SortReducer.class); // 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定输入数据的路径 FileInputFormat.addInputPath(job, new Path(remainingArgs[0])); // 指定输出路径,并要求该输出路径一定是不存在的 FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1])); // 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出! System.exit(job.waitForCompletion(true) ? 0 : 1); } }
边栏推荐
猜你喜欢

Special training - linked list

Redis transaction - detailed implementation process of seckill case simulation

What is the difference between College coder and 985 programmer?

Chrome selenium uses the default profile without emptying it every time

Rapid SQL All-Platforms高性能 SQL 代码

振奋人心 元宇宙!下一代互联网的财富风口

SPR:SUPERVISED PERSONALIZED RANKING BASED ON PRIOR KNOWLEDGE FOR RECOMMENDATION

第四篇章:运行时数据区——共享空间

什么是文件管理软件?你为什么需要它?

Flask learning notes
随机推荐
序列模型(三)- 序列模型和注意力机制
Unity Image中Sprite和overrideSprite区别(转载)
Kingbasees SQL language reference manual of Jincang database (4. Pseudo column)
Kingbasees SQL language reference manual of Jincang database (8. Function (2))
Redis installation
阿里云如何将一个域名解析到另一个域名上
Rapid SQL All-Platforms高性能 SQL 代码
PyQt5_QListWidget分页多选控件
MySQL master-slave replication
vs中新建文件/筛选器/文件夹
[Internet of vehicles prototype system II] database + application layer protocol design
Chapter2 Standard Output
Flutter 运行flutter pub get 报错“客户端没有所需特权“
7. < tag dynamic programming and stock trading Collection> lt.121. The best time to buy and sell stocks + lt.122. The best time to buy and sell stocks II + lt.123. The best time to buy and sell stocks
32 < tag array and bit operation > supplement: Lt. sword finger offer 56 - I. number of occurrences of numbers in the array
redis伪集群一键部署脚本---亲测可用
数据湖:从数据仓库看数据湖
31-spark的算子使用及调度流程
2022/7/20
redis 复制集群搭建