当前位置:网站首页>Spark Learning: Spark implementation of distcp
Spark Learning: Spark implementation of distcp
2022-07-24 09:35:00 【I love evening primrose a】

One 、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Spark</groupId>
<artifactId>Spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.12.15</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>cn.spark.study.App</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
Two 、 Executive class ,main Method
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.concurrent.duration.{
Duration, DurationInt }
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.collection.mutable.ListBuffer
import java.util.concurrent.Executors
import scala.concurrent.Await
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
object Distcp {
def main(args: Array[String]): Unit = {
type OptionMap = Map[Symbol, Any]
if (args.length == 0) println("hhhhhh")
val arglist = args.toList
def nextOption(map : OptionMap, list: List[String]) : OptionMap = {
def isSwitch(s : String) = (s(0) == '-')
list match {
case Nil => map
case "-i" :: value => nextOption(map ++ Map('ignoreFailure -> 1), list.tail)
case "-m" :: value :: tail =>
nextOption(map ++ Map('maxconcurrency -> value.toInt), tail)
case string :: Nil => nextOption(map ++ Map('outfile -> string), list.tail)
case string :: tail => nextOption(map ++ Map('infile -> string), tail)
case option :: opt2 :: tail if isSwitch(opt2) =>
println("Unknown option "+option)
sys.exit(1)
}
}
val options = nextOption(Map(),arglist)
println(options)
val sourceFolder = String.valueOf(options(Symbol("infile")))
val targetFolder = String.valueOf(options(Symbol("outfile")))
val concurrency = (options(Symbol("maxconcurrency"))).toString.toInt
val ignoreFailure = options(Symbol("ignoreFailure")).toString.toInt
val sparkConf = new SparkConf().setAppName("bingbing").setMaster("local[1]")
val sc = new SparkContext(sparkConf)
val sb = new StringBuffer();
var fileNames = new ListBuffer[String]()
val conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000")
traverseDir(conf, sourceFolder, fileNames);
fileNames.foreach(
fileName =>
try {
sc.textFile(fileName, concurrency).saveAsTextFile(fileName.replace(sourceFolder, targetFolder));
} catch {
case t: Throwable => t.printStackTrace()
if(ignoreFailure==0){
throw new Exception("failed to copy "+fileName)
}
})
}
def traverseDir(hdconf: Configuration, path: String, filePaths: ListBuffer[String]) {
val files = FileSystem.get(hdconf).listStatus(new Path(path))
files.foreach {
fStatus =>
{
if (!fStatus.isDirectory) {
filePaths += fStatus.getPath.toString
} else if (fStatus.isDirectory) {
traverseDir(hdconf, fStatus.getPath.toString, filePaths)
}
}
}
}
}
3、 ... and 、 Generate jar package 
Four 、 function
1、 View directory
hadoop fs -ls /home/student5/niezb

2、 Create an empty directory bing
hadoop fs -mkdir -p /home/student5/niezb/bing
3、 Use Distcp Move /home/student5/niezb All files and directories under to /home/student5/niezb/bing Next
spark-submit --class Distcp --master local[*] /home/student5/niezb/Spark-0.0.1-SNAPSHOT.jar -i -m 3 "/home/student5/niezb" "/home/student5/niezb/bing"
4、 View results
Xi xi xi !
边栏推荐
- SDUT compilation principle experimental code
- Firewall off and on command
- FreeRTOS - use of software timer
- Common evaluation indexes of medical image segmentation
- SQL 优化原则
- Leetcode question brushing series -- 174. Dungeon games
- Cess test online line! The first decentralized storage network to provide multiple application scenarios
- PHP Basics - session control - Session
- TiFlash 源码阅读(五) DeltaTree 存储引擎设计及实现分析 - Part 2
- ASI-20220222-Implicit PendingIntent
猜你喜欢

Detailed explanation of the whole process of R & D demand splitting | agile practice

gnuplot软件学习笔记

Linked list - 24. Exchange nodes in the linked list in pairs
![[don't bother with reinforcement learning] video notes (I) 1. What is reinforcement learning?](/img/84/48a6a83192a12dafd88bcd74db0955.gif)
[don't bother with reinforcement learning] video notes (I) 1. What is reinforcement learning?

Tang Yudi opencv background modeling

One click openstack single point mode environment deployment - preliminary construction

Android Version Description security privacy 13
![[assembly language practice] solve the unary quadratic equation ax2+bx+c=0 (including source code and process screenshots, parameters can be modified)](/img/5e/782e5c33accc455994aae044970431.png)
[assembly language practice] solve the unary quadratic equation ax2+bx+c=0 (including source code and process screenshots, parameters can be modified)

Gnuplot software learning notes

What if path is deleted by mistake when configuring system environment variables?
随机推荐
Ue5 film and television animation rendering MRQ layered learning notes
Account 1-3
Common evaluation indexes of medical image segmentation
Problems and abuse of protocol buffers
获取所有股票历史行情数据
Aruba学习笔记06-无线控制AC基础配置(CLI)
配置系统环境变量的时候误删了Path怎么办?
JS locate Daquan to get the brother, parent and child elements of the node, including robot instances
Detailed sequence traversal of leetcode102 binary tree
The difference between classification and regression
Promise基础总结
[leetcode] 31. Next arrangement
Cess test online line! The first decentralized storage network to provide multiple application scenarios
科目1-3
Opencv learning Day5
Linux deployment mysql8.0
Gin framework uses session and redis to realize distributed session & Gorm operation mysql
排序入门—插入排序和希尔排序
SDUT compilation principle experimental code
Leetcode question brushing series -- 174. Dungeon games