current position:Home>Spark learning experience summary

Spark learning experience summary

2022-09-23 10:18:32Actually I'm real

我是用HDP按的Hadoop,Spark集群

1.I am running on the serverSpark程序,启动脚本如下

/usr/hdp/2.4.0.0-169/spark/bin/spark-submit --class com.lwb.streamingtest.steaming_sql.Spark_Stream_SQL_Test2 --master yarn  --files /usr/hdp/2.4.0.0-169/spark/conf/hive-site.xml,/usr/hdp/2.4.0.0-169/hbase/conf/hbase-site.xml  /alwb_test/SparkTest-0.0.1-SNAPSHOT-allinone.jar > aaa.log 2>&1 &

不加载hive-site.xmlwill report an exception.

2.I set it up in the startup scriptmaster是yarn意思是在yarn上执行,但是程序里的masterSettings take precedence over startup scripts.

I set it up in the program beforeSparkConf conf = new SparkConf().setAppName("lwb_sql1").setMaster("local[2]");

This time it is executed locally,Then I won't stop the task.如果是在yarn上运行,关闭命令是

yarn application -kill applicationId ,But running locally is notyarnRunning this command doesn't work,I experimented for half a day.

I finally hitLinux命令jps,这是个java命令,Use can be viewedjava启动的进程,I also suddenly wanted to try it out.After that there are two process names with the same name probablySpark Submit,The front is the process number,直接敲入 Linux 命令 kill 进程号,就杀掉了.

 

3.今天研究了一天sparksql的DataFrame的joinThe function encounters a problem as follows

I have now generated twoDataFrame  A data from Oracle,A data fromHive
I now want to inner join or left join the two tables
我是用JAVA语言实现的,之前的写法如下
dfHive.join(dfOracle,"vin").show();//This can be connected(In fact, I later found out that this is an inner join,Then I think left join needs to be used3A parameter is written)
I need to use if I want to use left union3个参数的join,找了一天,都是scala的写法,没找到java的,I can't find any examples on the official website,Maybe not in the right place,The code I finally tried out is as follows,谁能想到一个Column去equalTo另外一个Column,Still returnedColumn类型...

df.join(jdbcDF,df.col("vin").equalTo(jdbcDF.col("VIN")),"left").show();

完整java代码如下:

import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

import com.alibaba.fastjson.JSONObject;

public class Spark_Stream_SQL_Test2 {
	static final String ZK_QUORUM = "devhadoop3.reachauto.com:2181,devhadoop2.reachauto.com:2181,devhadoop1.reachauto.com:2181";
	static final String GROUP = "spark_json_test_group";
	static final String TOPICSS = "spark_sql_lwb_1";
	static final String NUM_THREAD = "5";

	@SuppressWarnings("serial")
	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "spark");
		SparkConf conf = new SparkConf().setAppName("lwb_sql1");//.setMaster("local[2]");
		conf.set("spark.testing.memory", "2147480000");// The following value is greater than512m即可
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
		/**
		 * 查询oracle
		 */
		SQLContext sqlContext = new SQLContext(jssc.sparkContext());
		System.out.println("================" + sqlContext.getSQLDialect());

		Map<String, String> options = new HashMap<String, String>();
		options.put("user", "root");
		options.put("password", "123456");
		options.put("url", "jdbc:oracle:thin:@10.10.171.167:1521:ORCL");
		options.put("dbtable", "test1");
		// How many rows of data to fetch at a time
//		options.put("fetchSize", "10");
		options.put("driver", "oracle.jdbc.driver.OracleDriver");
		final DataFrame jdbcDF = sqlContext.read().format("jdbc").options(options).load();
		jdbcDF.show();
		
		/**
		 * 查询hive
		 */
		final HiveContext hiveContext = new HiveContext(jssc.sparkContext());
		int numThreads = Integer.parseInt(NUM_THREAD);
		Map<String, Integer> topicMap = new HashMap<String, Integer>();
		String[] topics = TOPICSS.split(",");
		for (String topic : topics) {
			topicMap.put(topic, numThreads);
		}
		JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, ZK_QUORUM, GROUP, topicMap);// 取出数据
		JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {// 只获取kafka的value实际值
					public String call(Tuple2<String, String> tuple2) {
						return tuple2._2();
					}
				});
		lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
			@Override
			public void call(JavaRDD<String> rdd) throws Exception {
				for (String jsonStr : rdd.collect()) {
					System.out.println("接收到数据:" + jsonStr);
					JSONObject obj = JSONObject.parseObject(jsonStr);
					String sql = obj.getString("sql");
					System.out.println(sql);
					DataFrame df = hiveContext.sql(sql);
					df.join(jdbcDF,df.col("vin").equalTo(jdbcDF.col("VIN")),"left").show();
//					df.join(jdbcDF,"vin").show();
				}
			}
		});
		jssc.start();//
		jssc.awaitTermination();//
	}
}

 

copyright notice
author[Actually I'm real],Please bring the original link to reprint, thank you.
https://en.chowdera.com/2022/266/202209231010447736.html

Random recommended