失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Java搭建Spark程序 提交到Yarn

Java搭建Spark程序 提交到Yarn

时间:2023-11-21 10:42:55

相关推荐

Java搭建Spark程序 提交到Yarn

文章目录

Java搭建Spark程序,提交到Yarn测试Demo

Java搭建Spark程序,提交到Yarn测试

Demo

pow文件依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><parent><artifactId>home</artifactId><groupId>com.sm</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>wc</artifactId><properties><java.version>1.8</java.version><spark.version>2.4.0</spark.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding></properties><dependencies><!-- /artifact/org.apache.spark/spark-core_2.11 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency></dependencies><build><finalName>wc</finalName><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.4</version><configuration><classifier>dist</classifier><appendAssemblyId>true</appendAssemblyId><descriptorRefs><descriptor>jar-with-dependencies</descriptor></descriptorRefs><archive><manifest><mainClass>com.sm.wc.MainClass</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><repositories><repository><id>nexus-aliyun</id><name>Nexus aliyun</name><url>/nexus/content/groups/public</url></repository></repositories></project>

两种方式编写WordCount JavaSparkContext方式

package com.sm.wc;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;public class MainClass1 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.set("spark.master", "yarn");conf.set("spark.submit.deployMode", "cluster");conf.set("spark.app.name", "wc");conf.set("spark.yarn.jars", "hdfs://bigdata-namenode-node1:8020/app/jars/lib/wc.jar");JavaSparkContext jsc = new JavaSparkContext(conf);JavaPairRDD<String, Integer> counts = jsc.textFile("hdfs://bigdata-namenode-node1:8020/hData/wc.txt").flatMap(new FlatMapFunction<String, String>() {private static final long serialVersionUID = 919178903075273415L;@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 6401760548427081598L;@Overridepublic Tuple2<String, Integer> call(String value) throws Exception {return Tuple2.apply(value, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 7489138021965588041L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});counts.foreach(new VoidFunction<Tuple2<String, Integer>>() {private static final long serialVersionUID = 6487711132207646171L;@Overridepublic void call(Tuple2<String, Integer> wc) throws Exception {System.out.println(wc._1 + ":" + wc._2);}});}}

SparkSession方式

package com.sm.wc;import org.apache.spark.api.java.function.*;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Encoders;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import java.util.Arrays;import java.util.List;public class MainClass2 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().config("spark.master", "yarn").config("spark.submit.deployMode", "cluster").config("spark.app.name", "wc").config("spark.yarn.jars", "hdfs://bigdata-namenode-node1:8020/app/jars/lib/wc.jar").getOrCreate();Dataset<String> ds = spark.read().textFile("hdfs://bigdata-namenode-node1:8020/hData/wc.txt");//Dataset只有一列,默认这列叫valueDataset<String> words = ds.flatMap((FlatMapFunction<String, String>) line -> {List<String> list = Arrays.asList(line.split(" "));return list.iterator();}, Encoders.STRING());words .createOrReplaceTempView("words");Dataset<Row> result = spark.sql("SELECT value,COUNT(*) counts FROM words GROUP BY value ORDER BY counts DESC");result.show(100);}}

如果觉得《Java搭建Spark程序 提交到Yarn》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。