当前位置:网站首页>Spark Learning: Spark implementation of distcp
Spark Learning: Spark implementation of distcp
2022-07-24 09:35:00 【I love evening primrose a】

One 、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Spark</groupId>
<artifactId>Spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.12.15</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>cn.spark.study.App</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
Two 、 Executive class ,main Method
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.concurrent.duration.{
Duration, DurationInt }
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.collection.mutable.ListBuffer
import java.util.concurrent.Executors
import scala.concurrent.Await
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
object Distcp {
def main(args: Array[String]): Unit = {
type OptionMap = Map[Symbol, Any]
if (args.length == 0) println("hhhhhh")
val arglist = args.toList
def nextOption(map : OptionMap, list: List[String]) : OptionMap = {
def isSwitch(s : String) = (s(0) == '-')
list match {
case Nil => map
case "-i" :: value => nextOption(map ++ Map('ignoreFailure -> 1), list.tail)
case "-m" :: value :: tail =>
nextOption(map ++ Map('maxconcurrency -> value.toInt), tail)
case string :: Nil => nextOption(map ++ Map('outfile -> string), list.tail)
case string :: tail => nextOption(map ++ Map('infile -> string), tail)
case option :: opt2 :: tail if isSwitch(opt2) =>
println("Unknown option "+option)
sys.exit(1)
}
}
val options = nextOption(Map(),arglist)
println(options)
val sourceFolder = String.valueOf(options(Symbol("infile")))
val targetFolder = String.valueOf(options(Symbol("outfile")))
val concurrency = (options(Symbol("maxconcurrency"))).toString.toInt
val ignoreFailure = options(Symbol("ignoreFailure")).toString.toInt
val sparkConf = new SparkConf().setAppName("bingbing").setMaster("local[1]")
val sc = new SparkContext(sparkConf)
val sb = new StringBuffer();
var fileNames = new ListBuffer[String]()
val conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000")
traverseDir(conf, sourceFolder, fileNames);
fileNames.foreach(
fileName =>
try {
sc.textFile(fileName, concurrency).saveAsTextFile(fileName.replace(sourceFolder, targetFolder));
} catch {
case t: Throwable => t.printStackTrace()
if(ignoreFailure==0){
throw new Exception("failed to copy "+fileName)
}
})
}
def traverseDir(hdconf: Configuration, path: String, filePaths: ListBuffer[String]) {
val files = FileSystem.get(hdconf).listStatus(new Path(path))
files.foreach {
fStatus =>
{
if (!fStatus.isDirectory) {
filePaths += fStatus.getPath.toString
} else if (fStatus.isDirectory) {
traverseDir(hdconf, fStatus.getPath.toString, filePaths)
}
}
}
}
}
3、 ... and 、 Generate jar package 
Four 、 function
1、 View directory
hadoop fs -ls /home/student5/niezb

2、 Create an empty directory bing
hadoop fs -mkdir -p /home/student5/niezb/bing
3、 Use Distcp Move /home/student5/niezb All files and directories under to /home/student5/niezb/bing Next
spark-submit --class Distcp --master local[*] /home/student5/niezb/Spark-0.0.1-SNAPSHOT.jar -i -m 3 "/home/student5/niezb" "/home/student5/niezb/bing"
4、 View results
Xi xi xi !
边栏推荐
- 配置系统环境变量的时候误删了Path怎么办?
- 云原生(十二) | Kubernetes篇之Kubernetes基础入门
- PHP Basics - PHP magic method
- 财务数字化转型
- & 和 &&、| 和 || 的区别
- Es document CRUD
- TiFlash 源码阅读(五) DeltaTree 存储引擎设计及实现分析 - Part 2
- [assembly language practice] solve the unary quadratic equation ax2+bx+c=0 (including source code and process screenshots, parameters can be modified)
- Aruba学习笔记06-无线控制AC基础配置(CLI)
- 我们说的组件自定义事件到底是什么?
猜你喜欢

Account 1-3

The difference between & &, | and |

Recursion - if the function calls itself internally, then the function is a recursive function & the effect is the same as that of the loop & the push condition return should be added, otherwise stack

dp最长公共子序列详细版本(LCS)

Code random notes_ Linked list_ Turn over the linked list in groups of 25K

Tang Yudi opencv background modeling

Getting started with sorting - insert sorting and Hill sorting

(5) Cloud integrated gateway gateway +swagger documentation tool

【基于ROS的URDF练习实例】四轮机器人与摄像头的使用

Lung CT segmentation challenge 2017 dataset download and description
随机推荐
TCP triple handshake connection combing
Protocol buffers 的问题和滥用
Boundless dialogue | participate in the live broadcast on July 25 and win the prize
Nuggets manufacturing industry, digital commerce cloud supply chain collaborative management system to achieve full chain intelligent management and control
How to open the port number of the server, and the corresponding port of common network services
Leetcode skimming: dynamic planning 03 (climb stairs with minimum cost)
Firewalld firewall related commands
[Luogu p3426] SZA template (string) (KMP)
PHP debugging tool - how to install and use firephp
C#/VB. Net: convert word or EXCEL documents to text
JS locate Daquan to get the brother, parent and child elements of the node, including robot instances
03_ UE4 advanced_ illumination
Makefile变量及动态库静态库
[leetcode] 31. Next arrangement
Scarcity in Web3: how to become a winner in a decentralized world
Little dolphin "transformed" into a new intelligent scheduling engine, which can be explained in simple terms in the practical development and application of DDS
Jenkins post build script does not execute
唐宇迪opencv-背景建模
We were tossed all night by a Kong performance bug
js定位大全获取节点的兄弟,父级,子级元素含robot实例