当前位置:网站首页>MapReduce advanced
MapReduce advanced
2022-07-23 10:43:00 【S_ ng】
1.HDFS Data format details
1.1 File format
Face the line :.txt Separable .seq Separable
For the column :.rc Separable .orc Separable
1.2 Compressed format
Separable :.lzo Native .bz2 Native
Indivisible :.gz Native .snappy It's not original
1.3 Set the output format to gzip
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
2. Customize Partition
2.1 Customize reduce Number
yarn jar TlHadoopCore-jar-with-dependencies.jar \
com.examples.WordCountV2 \
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-Dmapred.reduce.tasks=2 \
/tmp/tianliangedu/input /tmp/output
2.2 Customize Partition Realization
*/
public static class MyHashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
// return key.toString().charAt(0);
}
}Appoint partition class
job.setPartitionerClass(MyHashPartitioner.class);
Script call :
yarn jar TlHadoopCore-jar-with-dependencies.jar \
com.examples.SelfDefinePartitioner \
-Dmapred.output.compress=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-Dmapred.reduce.tasks=2 \
/tmp/input /tmp/output
3. MR The application reads the external configuration file -Configuration Pass on
Step breakdown
- Implementation is based on input_filter A sort of file data in the directory , namely Map and Reduce Read in and reduction processing of .
- Put the local file whitelist.txt Pass to Driver class , Read the contents of the file txtContent
- take txtContent adopt Configuration Of set Methods are passed to map and reduce Mission
- stay map Pass... In the task Configuration Object's get Method to get the passed value txtContent
- take txtContent It can be interpreted as Set object , Yes map In the task map Method to filter the output
- because map The end has been filtered ,reduce The end will not need any change
package com.tianliangedu.core.readconfig; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.log4j.Logger; // start-up mr Of driver class public class ConfigSetTransferDriver { public static Logger logger = Logger .getLogger(ConfigSetTransferDriver.class); // map class , Realization map function public static class LineProcessMapper extends Mapper<Object, Text, Text, IntWritable> { // Temporarily store the value of each passed word , Save space for repeated applications private Text outputKey = new Text(); private IntWritable outputValue = new IntWritable(); // Filter whitename Of set aggregate private Set<String> whiteNameSet = new HashSet<String>(); // Every map The task has and will only be executed once setup Method , For initialization map Required parameters before function execution @Override protected void setup( Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String whitelistString = conf.get("whitelist"); String[] whiteNameArray = whitelistString.split("\\s"); whiteNameSet.addAll(Arrays.asList(whiteNameArray)); } // The core map The concrete implementation of the method , one by one <key,value> Yes, deal with public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // adopt context object , take map Output one by one String tempLine = value.toString(); if (tempLine != null && tempLine.trim().length() > 0) { String[] columnArray = tempLine.split("\\s"); if (whiteNameSet.contains(columnArray[0])) { outputKey.set(columnArray[0]); outputValue.set(Integer.parseInt(columnArray[1])); context.write(outputKey, outputValue); } } } } // reduce class , Realization reduce function public static class SortReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // The core reduce The concrete implementation of the method , one by one <key,List(v1,v2)> To deal with public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // Reinforced for, Get the value of each element in the iterator in turn for (IntWritable val : values) { // Output the calculation results one by one context.write(key, val); } } } // Read the contents of a file with a specified local path and file encoding , Convert to string public static String readFile(String filePath, String fileEncoding) { if (fileEncoding == null) { fileEncoding = System.getProperty("file.encoding"); } File file = new File(filePath); BufferedReader br = null; String line = null; StringBuilder stringBuilder = new StringBuilder(); int lineCounter=0; try { br = new BufferedReader(new InputStreamReader(new FileInputStream( file), fileEncoding)); while ((line = br.readLine()) != null) { if(lineCounter>0){ stringBuilder.append("\n"); } stringBuilder.append(line); lineCounter++; } return stringBuilder.toString(); } catch (Exception e) { logger.info(e.getLocalizedMessage()); } finally { if (br != null) { try { br.close(); } catch (IOException e) { logger.info(e.getLocalizedMessage()); logger.info(" close IOUtil An error occurred while streaming !"); } } } return null; } // Configuration file reading and value passing public static void readConfigAndTransfer(Configuration conf,String filePath) { // Read local configuration file String source = readFile(filePath, "utf-8"); // Pass the values in the configuration file through conf set Way of transmission To the compute node conf.set("whitelist", source); // By means of log printing , The read value , Print out , If you don't print the log , The following code snippet can be removed logger.info("whitelist=" + source); } // start-up mr Of driver Method public static void main(String[] args) throws Exception { // Get the cluster configuration parameters Configuration conf = new Configuration(); // Parameter resolver GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if ((remainingArgs.length < 3)) { System.err .println("Usage: yarn jar jar_path main_class_path -D parameter list <in> <out>"); System.exit(2); } // Read and pass configuration parameters readConfigAndTransfer(conf,remainingArgs[2]); // Set to this job In the example Job job = Job.getInstance(conf, " Dawn conf Pass it on directly "); // Specifies that the main class for this execution is WordCount job.setJarByClass(ConfigSetTransferDriver.class); // Appoint map class job.setMapperClass(LineProcessMapper.class); // Appoint reducer class job.setReducerClass(SortReducer.class); // Appoint job Output key and value The type of , If map and reduce The output type is not exactly the same , It needs to be reset map Of output Of key and value Of class type job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // Specify the path to the input data FileInputFormat.addInputPath(job, new Path(remainingArgs[0])); // Specify the output path , And the output path must not exist FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1])); // Appoint job Execution mode , Wait until the task is completed , The client submitting the task will exit ! System.exit(job.waitForCompletion(true) ? 0 : 1); } }
边栏推荐
- Redis replication cluster setup
- Accessory mode
- The safe distance between you and personal information leakage may be decided by a laptop!
- Information security is in danger, and it is urgent to control the leakage of enterprise data assets
- PyQt5_ Pyqtgraph mouse draws line segments on the line graph
- No routines, no traps, no advertisements | are you sure you don't need this free instant messaging software?
- 推荐一款 Shell 装逼神器,已开源!网友:真香。。。
- What is the core essence of smart parks?
- More detailed series than your teacher -- structure
- Figure 8 sequence of crystal head connection of network cable
猜你喜欢

TZC 1283: simple sort - heap sort

Rapid SQL all platforms high performance SQL code

Introduction to partition operators, broadcast variables and accumulators of 32 spark

Unityc realizes the conversion of Chinese characters to Pinyin - using Microsoft chspinyinconv Library

牛客刷题篇——剑指offer (第二期)

Redis pseudo cluster one click deployment script - pro test available

网线水晶头接法图解8根顺序

Sequence model (2) - natural language processing and word nesting

云徙科技CTO李楠:技术和技术人,与数字化共同进化

Practice of RTC performance automation tool in memory optimization scenario
随机推荐
openvino_datawhale
toco生成tflite模型
12 个适合做外包项目的开源后台管理系统
数据湖:从数据仓库看数据湖
8.< tag-动态规划和LCS问题>lt.300. 最长递增子序列 + lt.674. 最长连续递增序列
C# IValueConverter接口用法举例
PyQt5_QListWidget分页多选控件
云徙科技CTO李楠:技术和技术人,与数字化共同进化
美团8年经验之谈,测试工程师如何进阶(自动化、性能、测开)
阿里云如何将一个域名解析到另一个域名上
7.< tag-动态规划和买卖股票合集>lt.121. 买卖股票的最佳时机 + lt.122.买卖股票的最佳时机 II+ lt.123. 买卖股票的最佳时机 III dbc
编译构建工具-bazel
SVG、canvas、绘制线段和填充多边形、矩形、曲线的绘制和填充
TZC 1283: 简单排序 —— 堆排序
Seektiger's okaleido has a big move. Will the STI of ecological pass break out?
MySQL master-slave replication
中国经济网:“元宇宙”炙手可热
【Warning】YOLOV5训练时的ignoring corrupt image/label: [Errno 2].....,无法全部训练数据集,快速带你解决它
[qt5.12] qt5.12 installation tutorial
kex_exchange_identification: read: Connection reset by peer 不完美解决办法(之一)