發布時間: 2019-07-26 16:45:44
JDBC介紹
Spark SQL可以通過JDBC從關系型數據庫中讀取數據的方式創建DataFrame,通過對DataFrame一系列的計算后,還可以將數據再寫回關系型數據庫中。
從MySQL中加載數據(Spark Shell方式)
1.啟動Spark Shell,必須指定mysql連接驅動jar包
/home/hadoop/apps/spark/bin/spark-shell \
--master spark://hdp08:7077 \
--jars /home/hadoop/mysql-connector-java-5.1.45.jar \
--driver-class-path /home/hadoop/mysql-connector-java-5.1.45.jar
--executor-memory 1g
--total-executor-cores 2
2.從mysql中加載數據
scala> case class Emp(empno: Int, ename: String, job:String,mgr:Int,hiredate:java.util.Date,sal:Float,comm:Float,deptno:Int)
scala>var sqlContext = new org.apache.spark.sql.SQLContext(sc);
scala> val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://hdp08:3306/sqoopdb", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "emp", "user" -> "root", "password" -> "root")).load()
3.執行查詢
jdbcDF.show()

將數據寫入到MySQL中(打jar包方式)
本文介紹使用Idea 開發spark連接mysql操作,并建立maven 工程進行相關開發
Maven中的pom.xml文件依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> |
編寫Spark SQL程序
package net.togogo.sql import java.util.Properties import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} object JdbcRDD { def main(args: Array[String]) { val conf = new SparkConf().setAppName("MySQL-Demo") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //通過并行化創建RDD val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" ")) //通過StructType直接指定每個字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //將RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //將schema信息應用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema) //創建Properties存儲數據庫相關屬性 val prop = new Properties() prop.put("user", "root") prop.put("password", "root") //將數據追加到數據庫 personDataFrame.write.mode("append").jdbc("jdbc:mysql://hdp08:3306/sqoopdb", "sqoopdb.person", prop) //停止SparkContext sc.stop() } } |
?打包與運行
1.用maven將程序打包
2.將Jar包提交到spark集群
/home/hadoop/apps/spark/bin/spark-submit \
--class net.togogo.sql.JdbcRDD \
--master spark://hdp08:7077 \
--jars /home/hadoop/mysql-connector-java-5.1.45.jar \
--driver-class-path /home/hadoop/mysql-connector-java-5.1.45.jar \
/home/hadoop/schema.jar
上一篇: 華為HCIE認證培訓課程等你來