失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 如何在IDEA上编写Spark程序?(本地+集群+java三种模式书写代码)

如何在IDEA上编写Spark程序?(本地+集群+java三种模式书写代码)

时间:2018-06-21 10:17:12

相关推荐

如何在IDEA上编写Spark程序?(本地+集群+java三种模式书写代码)

本篇博客,Alice为大家带来关于如何在IDEA上编写Spark程序的教程。

文章目录

写在前面准备材料图解WordCountpom.xml本地执行集群上运行Java8版[了解]

写在前面

本次讲解我会通过一个非常经典的案例,同时也是在学MapReduce入门时少不了的一个例子——WordCount 来完成不同场景下Spark程序代码的书写。大家可以在敲代码时可以思考这样一个问题,用Spark是不是真的比MapReduce简便?

准备材料

wordcount.txt

hello me you herhello you herhello herhello

图解WordCount

pom.xml

创建Maven项目并补全目录、配置pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.czxy</groupId><artifactId>spark_demo</artifactId><version>1.0-SNAPSHOT</version><!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --><repositories><repository><id>aliyun</id><url>/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>/artifactory/cloudera-repos/</url></repository><repository><id>jboss</id><url>/nexus/content/groups/public</url></repository></repositories><properties><piler.source>1.8</piler.source><piler.target>1.8</piler.target><encoding>UTF-8</encoding><scala.version>2.11.8</scala.version><pat.version>2.11</pat.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.2.0</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><!-- <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><!--<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0-mr1-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.0-cdh5.14.0</version></dependency>--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.4</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version></dependency><dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- 指定编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin><!-- 指定编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

maven-assembly-plugin和maven-shade-plugin的区别

可以参考这篇博客/lisheng19870305/article/details/88300951

本地执行

package com.czxy.scalaimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** @Auther: Alice菌* @Date: /2/19 08:39* @Description:流年笑掷 未来可期。以梦为马,不负韶华!*//*** 本地运行*/object Spark_wordcount {def main(args: Array[String]): Unit = {// 1.创建SparkContextvar config = new SparkConf().setAppName("wc").setMaster("local[*]")val sc = new SparkContext(config)sc.setLogLevel("WARN")// 2.读取文件// A Resilient Distributed Dataset (RDD)弹性分布式数据集// 可以简单理解为分布式的集合,但是Spark对它做了很多的封装// 让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了val fileRDD: RDD[String] = sc.textFile("G:\\干货\\Spark\\wordcount.txt")// 3.处理数据// 3.1 对每一行数据按空格切分并压平形成一个新的集合中// flatMap是对集合中的每一个元素进行操作,再进行压平val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))// 3.2 每个单词记为1val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))// 3.3 根据key进行聚合,统计每个单词的数量// wordAndOneRDD.reduceByKey((a,b)=>a+b)// 第一个_: 之前累加的结果// 第二个_: 当前进来的数据val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)// 4. 收集结果val result: Array[(String, Int)] = wordAndCount.collect()// 控制台打印结果result.foreach(println)}}

运行的结果:

集群上运行

package com.czxy.scalaimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** @Auther: Alice菌* @Date: /2/19 09:12* @Description:流年笑掷 未来可期。以梦为马,不负韶华!*//*** 集群运行*/object Spark_wordcount_cluster {def main(args: Array[String]): Unit = {// 1. 创建SparkContextval config = new SparkConf().setAppName("wc")val sc = new SparkContext(config)sc.setLogLevel("WARN")// 2. 读取文件// A Resilient Distributed Dataset (RDD) 弹性分布式数据集// 可以简单理解为分布式的集合,但是spark对它做了很多的封装// 让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了val fileRDD: RDD[String] = sc.textFile(args(0)) // 文件输入路径// 3. 处理数据// 3.1对每一行数据按照空格进行切分并压平形成一个新的集合// flatMap是对集合中的每一个元素进行操作,再进行压平val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))// 3.2 每个单词记为1val wordAndOneRDD = wordRDD.map((_,1))// 3.3 根据key进行聚合,统计每个单词的数量// wordAndOneRDD.reduceByKey((a,b)=>a+b)// 第一个_:之前累加的结果// 第二个_:当前进来的数据val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)wordAndCount.saveAsTextFile(args(1)) // 文件输出路径}}

打包 上传

执行命令提交到Spark-HA集群

/export/servers/spark/bin/spark-submit \--class cn.itcast.sparkhello.WordCount \--master spark://node01:7077,node02:7077 \--executor-memory 1g \--total-executor-cores 2 \/root/wc.jar \hdfs://node01:8020/wordcount/input/words.txt \hdfs://node01:8020/wordcount/output4

执行命令提交到YARN集群

/export/servers/spark/bin/spark-submit \--class cn.itcast.sparkhello.WordCount \--master yarn \--deploy-mode cluster \--driver-memory 1g \--executor-memory 1g \--executor-cores 2 \--queue default \/root/wc.jar \hdfs://node01:8020/wordcount/input/words.txt \hdfs://node01:8020/wordcount/output5

这里我们提交到YARN集群

运行结束后在hue中查看结果

Java8版[了解]

Spark是用Scala实现的,而scala作为基于JVM的语言,与Java有着良好集成关系。用Java语言来写前面的案例同样非常简单,只不过会有点冗长。

package com.czxy.scala;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.Tuple2;import java.util.Arrays;/*** @Auther: Alice菌* @Date: /2/21 09:48* @Description: 流年笑掷 未来可期。以梦为马,不负韶华!*/public class Spark_wordcount_java8 {public static void main(String[] args){SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> fileRDD = jsc.textFile("G:\\干货\\Spark\\wordcount.txt");JavaRDD<String> wordRDD = fileRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator());JavaPairRDD<String, Integer> wordAndOne = wordRDD.mapToPair(w -> new Tuple2<>(w, 1));JavaPairRDD<String, Integer> wordAndCount = wordAndOne.reduceByKey((a, b) -> a + b);//wordAndCount.collect().forEach(t->System.out.println(t));wordAndCount.collect().forEach(System.out::println);//函数式编程的核心思想:行为参数化!}}

运行后的结果是一样的。

本次的分享就到这里,受益的小伙伴或对大数据技术感兴趣的朋友记得点赞关注Alice哟(^U^)ノ~YO

如果觉得《如何在IDEA上编写Spark程序?(本地+集群+java三种模式书写代码)》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。