失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Java Web提交参数到Spark集群执行任务

Java Web提交参数到Spark集群执行任务

时间:2022-08-10 02:43:13

相关推荐

Java Web提交参数到Spark集群执行任务

/nanphonfy/article/details/52423865

提供一个API供使用者调用,大抵需求如下:

输入某用户的位置(经纬度),提交到Web服务器,再把任务(找出该用户附近一公里内的商城推荐)提交到Spark集群上执行,返回计算结果后再存入到Redis数据库中,供后台调用返回结果给使用方。

网上关于这方面的资料大抵是基于spark-assembly-1.4.1-hadoop2.6.0.jar,而我们这边的环境是spark2.0-Hadoop2.6.0,版本的不同意味着教程可能不适用。经过测试证明,spark1.4版的jar包与平台环境不兼容,会出现任务提交失败的异常。所以建议把平台环境的jar包拷入项目。

另一方面,由于集群和发布项目的web服务器不在同一台机器,所以需要把spark相关的jar包(187M)拷贝一份到web服务器。

数据分析人员开发了一个jar(spark-Java-2.xx-SNAPSHOT.jar)包,需要调用StationIdForUser类:传入经纬度,然后它会上传任务到集群,但它依赖于spark集群的众多jar包。

package com.sibat.spark.action;import org.apache.hadoop.conf.Configuration;import org.apache.spark.SparkConf;import org.apache.spark.deploy.yarn.Client;import org.apache.spark.deploy.yarn.ClientArguments;//参考网上教程public class Action {public static void main(String[] args) {//String tmp = Thread.currentThread().getContextClassLoader().getResource("").getPath();//tmp = tmp.substring(0, tmp.length() - 8);String[] arg0 = new String[] { "--class", "cn.sibat.spark.StationIdForUser",//"--jar", ".../apache-tomcat-7.0.70/webapps/spark-task_lib/spark-java-2.4-SNAPSHOT.jar",//"--arg", "22.1956491323113,113.555733036396" // 经纬度};// StationIdForUser.main(arg0);//该方法记得注释,不然会提交错误//以下方法是为了方便在本地环境做测试Configuration conf = new Configuration();String os = System.getProperty("os.name");boolean cross_platform = false;if (os.contains("Windows")) {cross_platform = true;}conf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平台提交任务conf.set("fs.defaultFS", "hdfs://192.168.2.9:9000");// 指定namenodeconf.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架conf.set("yarn.resourcemanager.address", "192.168.2.9:8032");// 指定resourcemanagerconf.set("yarn.resourcemanager.scheduler.address", "192.168.2.9:8030");// 指定资源分配器conf.set("mapreduce.jobhistory.address", "192.168.2.9:10020");System.setProperty("SPARK_YARN_MODE", "true");SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("JavaSparkPi");//appname可以随便取,master有多种提交方式,请自行选择// SparkConf sparkConf = new SparkConf();ClientArguments cArgs = new ClientArguments(arg0);new Client(cArgs, conf, sparkConf).run();}}12345678910111213141516171819222324252627282930313233343536373839401234567891011121314151617181922232425262728293031323334353637383940

把jar发布到web服务器,执行java -jar jar包名 参数 参数

无法正常提交,猜测是jar包冲突。

为了验证这一点,写Java程序验证。思路大抵是:把本地的jar包存入一个List中,然后再遍历服务器的jar文件,若有某个jar在本地没有匹配到,就写入2.txt(多余的jar,可以删除)。

package com.zh.zsr;import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.util.ArrayList;import java.util.LinkedList;import java.util.List;/*** 对比服务器文件目录和Windows文件目录中,哪个文件是多余的* * @author nanphonfy* @time 8月31日 上午9:54:19*/public class FilePath {public static void main(String[] args) throws IOException {String path = "C:\\Users\\sibat\\Desktop\\spark-task_lib";FilePath fp = new FilePath();List<String> paths = fp.getFiles(path);//可以得到本地的所有jar包List<String> names = new ArrayList<>();for (String p : paths) {String[] temp = p.split("\\\\");//Linux环境为"/"int len = temp.length;String yes = temp[len - 1];names.add(yes);}int len = names.size();String readFile = "C://Users//sibat//Desktop//1.txt";String writeFile = "C://Users//sibat//Desktop//2.txt";FileInputStream fis = new FileInputStream(readFile);InputStreamReader isw = new InputStreamReader(fis, "GBK");BufferedReader br = new BufferedReader(isw);// 把filewriter的写法写成FileOutputStream形式FileOutputStream fos = new FileOutputStream(writeFile);OutputStreamWriter osw = new OutputStreamWriter(fos, "GBK");BufferedWriter bw = new BufferedWriter(osw);// 把filewriter的写法写成FileOutputStream形式String line = "";/* 得到服务器的完整jar包名,并把它存入2.txtlong a = System.currentTimeMillis();while ((line = br.readLine()) != null) {int len1 = "-rw-r--r-- 1 datum datum 69409 8月 30 17:41 ".length();// line.substring(len1+1);bw.write(line.substring(len1 + 4));bw.newLine();bw.flush();// System.out.println(line);}long b = System.currentTimeMillis();System.out.println(b - a);*///把2.txt得到的服务器完整jar包名,拷到1.txtlong a = System.currentTimeMillis();while ((line = br.readLine()) != null) {int i;for (i = 0; i < len; i++) {if (line.equals(names.get(i)))break;}if (i == len) {bw.write(line);bw.newLine();bw.flush();}}long b = System.currentTimeMillis();System.out.println(b - a);}private List<String> absolutePaths = new LinkedList<>();/** 通过递归得到某一路径下所有的目录及其文件*/public List<String> getFiles(String filePath) {File root = new File(filePath);File[] files = root.listFiles();for (File file : files) {if (file.isDirectory()) {getFiles(file.getAbsolutePath());} else {// 默认为没有目录,只有文件absolutePaths.add(file.getAbsolutePath().toString());}}return absolutePaths;}}12345678910111213141516171819222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596971234567891011121314151617181922232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697

这种显示方法即可切割,得到完整的jar包名。复制进1.txt,通过如上程序去掉jar包名前的字符。

最终发现多出5个jar包,在web服务器中删除即可。

spark-task.jar ->(依赖的jar包) spark-task_lib

紧接着,发现提交成功,但是spark上的任务却都是失败的,记录日志如下:

[datum@webserver webapps]$ java -jar spark-task.jar log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).log4j:WARN Please initialize the log4j system properly.log4j:WARN See /log4j/1.2/faq.html#noconfig for more info.Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties16/08/31 10:00:16 INFO Client: Requesting a new application from cluster with 4 NodeManagers16/08/31 10:00:17 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)16/08/31 10:00:17 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead16/08/31 10:00:17 INFO Client: Setting up container launch context for our AM16/08/31 10:00:17 INFO Client: Setting up the launch environment for our AM container16/08/31 10:00:17 INFO Client: Preparing resources for our AM container16/08/31 10:00:17 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.16/08/31 10:00:19 INFO Client: Uploading resource file:/tmp/spark-367eaac7-b80e-4517-87b3-d6540fc2cc13/__spark_libs__5719539575137016651.zip -> hdfs://192.168.2.9:9000/user/datum/.sparkStaging/application_1472554805174_0017/__spark_libs__5719539575137016651.zip16/08/31 10:00:22 INFO Client: Uploading resource file:/data/apps/apache-tomcat-7.0.70/webapps/spark-task_lib/spark-java-2.4-SNAPSHOT.jar -> hdfs://192.168.2.9:9000/user/datum/.sparkStaging/application_1472554805174_0017/spark-java-2.4-SNAPSHOT.jar16/08/31 10:00:22 INFO Client: Uploading resource file:/tmp/spark-367eaac7-b80e-4517-87b3-d6540fc2cc13/__spark_conf__3363112904998123006.zip -> hdfs://192.168.2.9:9000/user/datum/.sparkStaging/application_1472554805174_0017/__spark_conf__.zip16/08/31 10:00:22 INFO SecurityManager: Changing view acls to: datum16/08/31 10:00:22 INFO SecurityManager: Changing modify acls to: datum16/08/31 10:00:22 INFO SecurityManager: Changing view acls groups to: 16/08/31 10:00:22 INFO SecurityManager: Changing modify acls groups to: 16/08/31 10:00:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(datum); groups with view permissions: Set(); users with modify permissions: Set(datum); groups with modify permissions: Set()16/08/31 10:00:22 INFO Client: Submitting application application_1472554805174_0017 to ResourceManager16/08/31 10:00:22 INFO YarnClientImpl: Submitted application application_1472554805174_001716/08/31 10:00:23 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:23 INFO Client: client token: N/Adiagnostics: N/AApplicationMaster host: N/AApplicationMaster RPC port: -1queue: defaultstart time: 1472608887913final status: UNDEFINEDtracking URL: http://master01:8088/proxy/application_1472554805174_0017/user: datum16/08/31 10:00:24 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:25 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:26 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:27 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:28 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:29 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:30 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:31 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:32 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:33 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:34 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:35 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:36 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:37 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:38 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:39 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:40 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:41 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:42 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:43 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:44 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:45 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:46 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:47 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:48 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:49 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)16/08/31 10:00:50 INFO Client: Application report for application_1472554805174_0017 (state: ACCEPTED)^C16/08/31 10:00:51 INFO ShutdownHookManager: Shutdown hook called16/08/31 10:00:51 INFO ShutdownHookManager: Deleting directory /tmp/spark-367eaac7-b80e-4517-87b3-d6540fc2cc13123456789101112131415161718192223242526272829303132333435363738394041424344454647484950515253545556575859606162123456789101112131415161718192223242526272829303132333435363738394041424344454647484950515253545556575859606162

Google了许多资料,猜测程序试图把web本机的jar包提交到spark上(然而这是没必要的,因为本机的jar是从集群上拷贝下来的)。

尝试用SparkSubmit方法,出现错误。最后查看源码才发现,原来是新版本的类做了改进,以致于旧教程不适用。代码如下:

package com.sibat.spark.action;import org.apache.spark.deploy.SparkSubmit;/*** * @author nanphonfy**/public class Task {//该方法与旧教程的差别在于,旧教程需要在arg0数组中写上--jar和--arg参数,而实际上,在新版的spark类中是不需要这么做的。public static void main(String[] args) {String arg = null;if (args.length == 2) {arg = args[0] + "," + args[1];System.out.println("通过web项目执行cmd:" + arg);String[] arg0 = new String[] { "--master", "spark://master01:7077", //"--class", "cn.sibat.spark.StationIdForUser", //".../apache-tomcat-7.0.70/webapps/spark-task_lib/spark-java-2.7-SNAPSHOT.jar", //"" + arg + "" };SparkSubmit.main(arg0);} else {String[] arg0 = new String[] { "--master", "spark://master01:7077", //"--class", "cn.sibat.spark.StationIdForUser", //".../apache-tomcat-7.0.70/webapps/spark-task_lib/spark-java-2.7-SNAPSHOT.jar", //"22.1956491323113,113.555733036396" };SparkSubmit.main(arg0);}}}123456789101112131415161718192223242526272829123456789101112131415161718192223242526272829

提交成功:

[datum@webserver webapps]$ sh exe.shUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.properties16/09/01 14:47:02 INFO SparkContext: Running Spark version 2.0.016/09/01 14:47:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable16/09/01 14:47:02 INFO SecurityManager: Changing view acls to: datum16/09/01 14:47:02 INFO SecurityManager: Changing modify acls to: datum16/09/01 14:47:02 INFO SecurityManager: Changing view acls groups to: 16/09/01 14:47:02 INFO SecurityManager: Changing modify acls groups to: 16/09/01 14:47:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(datum); groups with view permissions: Set(); users with modify permissions: Set(datum); groups with modify permissions: Set()16/09/01 14:47:03 INFO Utils: Successfully started service 'sparkDriver' on port 55903.16/09/01 14:47:03 INFO SparkEnv: Registering MapOutputTracker16/09/01 14:47:03 INFO SparkEnv: Registering BlockManagerMaster16/09/01 14:47:03 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-10614c25-2d1a-472d-8020-d3746329084016/09/01 14:47:03 INFO MemoryStore: MemoryStore started with capacity 1106.4 MB16/09/01 14:47:04 INFO SparkEnv: Registering OutputCommitCoordinator16/09/01 14:47:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.16/09/01 14:47:04 INFO Utils: Successfully started service 'SparkUI' on port 4041.16/09/01 14:47:04 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.2.2:404116/09/01 14:47:04 INFO SparkContext: Added JAR file:/data/apps/apache-tomcat-7.0.70/webapps/spark-task_lib/spark-java-2.6-SNAPSHOT.jar at spark://192.168.2.2:55903/jars/spark-java-2.6-SNAPSHOT.jar with timestamp 147271242448116/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://master01:7077...16/09/01 14:47:04 INFO TransportClientFactory: Successfully created connection to master01/192.168.2.9:7077 after 26 ms (0 ms spent in bootstraps)16/09/01 14:47:04 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-0901144819-000416/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-0901144819-0004/0 on worker-0901101340-192.168.2.13-55623 (192.168.2.13:55623) with 16 cores16/09/01 14:47:04 INFO StandaloneSchedulerBackend: Granted executor ID app-0901144819-0004/0 on hostPort 192.168.2.13:55623 with 16 cores, 1024.0 MB RAM16/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-0901144819-0004/1 on worker-0901101426-192.168.2.7-56600 (192.168.2.7:56600) with 8 cores16/09/01 14:47:04 INFO StandaloneSchedulerBackend: Granted executor ID app-0901144819-0004/1 on hostPort 192.168.2.7:56600 with 8 cores, 1024.0 MB RAM16/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-0901144819-0004/2 on worker-0901101433-192.168.2.10-52999 (192.168.2.10:52999) with 8 cores16/09/01 14:47:04 INFO StandaloneSchedulerBackend: Granted executor ID app-0901144819-0004/2 on hostPort 192.168.2.10:52999 with 8 cores, 1024.0 MB RAM16/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-0901144819-0004/3 on worker-0901101446-192.168.2.6-47335 (192.168.2.6:47335) with 8 cores16/09/01 14:47:04 INFO StandaloneSchedulerBackend: Granted executor ID app-0901144819-0004/3 on hostPort 192.168.2.6:47335 with 8 cores, 1024.0 MB RAM16/09/01 14:47:04 INFO Utils: Successfully started service 'org.tyBlockTransferService' on port 45142.16/09/01 14:47:04 INFO NettyBlockTransferService: Server created on 192.168.2.2:4514216/09/01 14:47:04 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.2.2, 45142)16/09/01 14:47:04 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.2:45142 with 1106.4 MB RAM, BlockManagerId(driver, 192.168.2.2, 45142)16/09/01 14:47:04 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.2.2, 45142)16/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-0901144819-0004/0 is now RUNNING16/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-0901144819-0004/1 is now RUNNING16/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-0901144819-0004/3 is now RUNNING16/09/01 14:47:04 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-0901144819-0004/2 is now RUNNING16/09/01 14:47:06 INFO EventLoggingListener: Logging events to hdfs://master01:9000/historyserverforspark/app-0901144819-000416/09/01 14:47:06 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.016/09/01 14:47:06 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.2.13:55909) with ID 016/09/01 14:47:06 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.2.6:58228) with ID 316/09/01 14:47:06 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.13:48271 with 366.3 MB RAM, BlockManagerId(0, 192.168.2.13, 48271)16/09/01 14:47:06 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.6:48243 with 366.3 MB RAM, BlockManagerId(3, 192.168.2.6, 48243)16/09/01 14:47:06 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.2.10:40581) with ID 216/09/01 14:47:07 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.10:50607 with 366.3 MB RAM, BlockManagerId(2, 192.168.2.10, 50607)16/09/01 14:47:07 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 227.3 KB, free 1106.2 MB)16/09/01 14:47:07 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.2.7:60528) with ID 116/09/01 14:47:07 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.2 KB, free 1106.2 MB)16/09/01 14:47:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.2:45142 (size: 20.2 KB, free: 1106.4 MB)16/09/01 14:47:07 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.7:50861 with 366.3 MB RAM, BlockManagerId(1, 192.168.2.7, 50861)16/09/01 14:47:07 INFO SparkContext: Created broadcast 0 from textFile at StationIdForUser.java:2716/09/01 14:47:07 INFO FileInputFormat: Total input paths to process : 116/09/01 14:47:07 INFO SparkContext: Starting job: foreach at StationIdForUser.java:3616/09/01 14:47:07 INFO DAGScheduler: Got job 0 (foreach at StationIdForUser.java:36) with 2 output partitions16/09/01 14:47:07 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at StationIdForUser.java:36)16/09/01 14:47:07 INFO DAGScheduler: Parents of final stage: List()16/09/01 14:47:07 INFO DAGScheduler: Missing parents: List()16/09/01 14:47:07 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at filter at StationIdForUser.java:36), which has no missing parents16/09/01 14:47:07 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.5 KB, free 1106.2 MB)16/09/01 14:47:07 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, free 1106.2 MB)16/09/01 14:47:07 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.2:45142 (size: 2.1 KB, free: 1106.4 MB)16/09/01 14:47:07 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:101216/09/01 14:47:07 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at filter at StationIdForUser.java:36)16/09/01 14:47:07 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks16/09/01 14:47:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.2.6, partition 0, ANY, 5509 bytes)16/09/01 14:47:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.2.13, partition 1, ANY, 5509 bytes)16/09/01 14:47:07 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 0 on executor id: 3 hostname: 192.168.2.6.16/09/01 14:47:07 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 1 on executor id: 0 hostname: 192.168.2.13.16/09/01 14:47:08 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.6:48243 (size: 2.1 KB, free: 366.3 MB)16/09/01 14:47:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.6:48243 (size: 20.2 KB, free: 366.3 MB)16/09/01 14:47:08 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.13:48271 (size: 2.1 KB, free: 366.3 MB)16/09/01 14:47:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.13:48271 (size: 20.2 KB, free: 366.3 MB)16/09/01 14:47:10 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 2554 ms on 192.168.2.13 (1/2)16/09/01 14:47:10 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2921 ms on 192.168.2.6 (2/2)16/09/01 14:47:10 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 16/09/01 14:47:10 INFO DAGScheduler: ResultStage 0 (foreach at StationIdForUser.java:36) finished in 2.930 s16/09/01 14:47:10 INFO DAGScheduler: Job 0 finished: foreach at StationIdForUser.java:36, took 3.130529 s16/09/01 14:47:10 INFO SparkUI: Stopped Spark web UI at http://192.168.2.2:404116/09/01 14:47:10 INFO StandaloneSchedulerBackend: Shutting down all executors16/09/01 14:47:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down16/09/01 14:47:10 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!16/09/01 14:47:10 INFO MemoryStore: MemoryStore cleared16/09/01 14:47:10 INFO BlockManager: BlockManager stopped16/09/01 14:47:10 INFO BlockManagerMaster: BlockManagerMaster stopped16/09/01 14:47:10 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!16/09/01 14:47:10 INFO SparkContext: Successfully stopped SparkContext16/09/01 14:47:10 INFO ShutdownHookManager: Shutdown hook called16/09/01 14:47:10 INFO ShutdownHookManager: Deleting directory /tmp/spark-5b4d875e-2252-4767-b63b-b297b9d559f91234567891011121314151617181922232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899012345678910111213141516171819222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990

关于提交经纬度到spark集群:

①参考网络资料,大抵是基于spark1.4.1-Hadoop2.6.0(而我们的环境是spark2.0.0-Hadoop2.6.0),正因为版本的不一致,导致教程与实操出现出入;

②先测试单机版jar,本地需引入单独开发的jar所依赖的spark包;

③然后测试集群,需要把多余jar包去掉(通过程序遍历匹配);

④提交任务,却执行失败(发现教程的某些参数在新版本中不需要写前缀);

⑤web服务器环境(需拷入spark的200多个jar包)的spark包与项目框架包存在大量冲突,所以web版嵌入调用spark程序不是一种好的解决方案;

⑥通过Java程序执行Linux命令,来执行提交spark任务的jar包,解决jar包冲突问题。

思路:把提交spark任务的程序与uhuibao项目分离,通过Java程序执行外部环境的Linux指令,以执行spark-task.jar,并把经度纬度设置为指令参数(它依赖于spark的众多包)。

controller层

package cn.sibat.uhuibao.api;import java.io.IOException;import java.util.ArrayList;import java.util.List;import javax.ws.rs.DefaultValue;import javax.ws.rs.GET;import javax.ws.rs.Path;import javax.ws.rs.Produces;import javax.ws.rs.QueryParam;import javax.ws.rs.core.MediaType;import cn.sibat.uhuibao.redis.service.RealTimeUserService;import cn.sibat.uhuibao.redis.service.impl.RealTimeUserServiceImpl;import cn.sibat.uhuibao.util.JsonUtil;import cn.sibat.uhuibao.util.Status;/*** 提交任务到spark集群执行* * @author nanphonfy**/@Path("spark")public class SparkApi {@GET@Path("recommend")@Produces(MediaType.APPLICATION_JSON)public String searchRealTimeRecommendation(@QueryParam("lng") @DefaultValue("null") String lng,@QueryParam("lat") @DefaultValue("null") String lat) {if (lng == null || lat == null || lng.isEmpty() || lat.isEmpty()) {return JsonUtil.getResponse(Status.PARA_ERROR).toString();}Runtime rt = Runtime.getRuntime();String command = "java -jar .../apache-tomcat-7.0.70/webapps/spark-task.jar "+ lng + " " + lat;System.out.println(command);new Thread(new Runnable() {// 以让spark有充足的时间做测试@Overridepublic void run() {try {Thread.sleep(1000 * 10);// 睡眠10秒System.out.println("等待10秒!");} catch (InterruptedException e) {e.printStackTrace();}}});try {rt.exec(command);// 执行Linux命令} catch (IOException e) {e.printStackTrace();return JsonUtil.getResponse(Status.SYS_ERROR).toString();}System.out.println("执行成功");String key = lng + "," + lat;RealTimeUserService rs = RealTimeUserServiceImpl.getInstance();String list = rs.getRealTimeRecommendation(key);if (list == null) {return JsonUtil.getResponse(Status.NOT_FOUND).toString();} else {String[] result = list.split(",");int len = result.length;List<String> values = new ArrayList<String>();for (int i = 0; i < len; i++) {values.add(result[i].trim());}return JsonUtil.getDataResponse(Status.OK, values).toString();}}}123456789101112131415161718192223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677123456789101112131415161718192223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677

该项目的控制层是用web service框架Jersey,而spark包也存在Jersey包,版本不一定与项目一致。除此之外,想嵌入到一个成型的项目,势必会引起冲突,必须手工除之,然而没有这个必要。

“由于调用 Runtime.exec方法所创建的子进程没有自己的终端或控制台,因此该子进程的标准IO(如stdin,stdou,stderr)都通过Process.getOutputStream(),Process.getInputStream(), Process.getErrorStream() 方法重定向给它的父进程了”

参考:

/fansy1990/article/details/48001013

http://wangbaoaiboy./blog/static/5211191011892938552/

如果觉得《Java Web提交参数到Spark集群执行任务》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。