Spark SQL笔记整理(三):加载保存功能与Spark SQL函数
[TOC]
加载保存功能
数据加载(json文件、jdbc)与保存(json、jdbc)
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p1import java.util.Propertiesimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.{SQLContext, SaveMode}/** * SparkSQL关于加载数据和数据落地的各种实战操作 */object _03SparkSQLLoadAndSaveOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc)// readOps(sqlContext) writeOps(sqlContext) sc.stop() } /** * 在write结果到目录中的时候需要留意相关异常 * org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists * 如果还想使用该目录的话,就需要设置具体的保存模式SaveMode * ErrorIfExist * 默认的,目录存在,抛异常 * Append * 追加 * Ingore * 忽略,相当于不执行 * Overwrite * 覆盖 */ def writeOps(sqlContext:SQLContext): Unit = { val df = sqlContext.read.json("D:/data/spark/sql/people.json") df.registerTempTable("people") val retDF = sqlContext.sql("select * from people where age > 20")// retDF.show() // 将结果落地 //retDF.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/data/spark/sql/people-1.json") // 落地到数据库 val url = "jdbc:mysql://localhost:3306/test" val table = "people1" // 会重新创建一张新表 val properties = new Properties() properties.put("user", "root") properties.put("password", "root") retDF.coalesce(1).write.jdbc(url, table, properties) } /* // sparkSQL读数据 // java.lang.RuntimeException: file:/D:/data/spark/sql/people.json is not a Parquet file sparkSQL使用read.load加载的默认文件格式为parquet(parquet.apache.org) 加载其它文件格式怎么办? 需要指定加载文件的格式.format("json") */ def readOps(sqlContext:SQLContext): Unit = { // val df = sqlContext.read.load("D:/data/spark/sql/users.parquet") // val df = sqlContext.read.format("json").load("D:/data/spark/sql/people.json") // val df = sqlContext.read.json("D:/data/spark/sql/people.json") val url = "jdbc:mysql://localhost:3306/test" val table = "people" val properties = new Properties() properties.put("user", "root") properties.put("password", "root") val df = sqlContext.read.jdbc(url, table, properties) df.show() }}
当执行读操作时,输出结果如下:
+---+----+---+------+| id|name|age|height|+---+----+---+------+| 1| 小甜甜| 18| 168.0|| 2| 小丹丹| 19| 167.0|| 3| 大神| 25| 181.0|| 4| 团长| 38| 158.0|| 5| 记者| 22| 169.0|+---+----+---+------+
当执行写操作时:
1.如果保存到json文件注意有各种写模式,另外其保存的是一个目录,与HDFS兼容的目录格式2.如果保存到jdbc则会在数据库中创建一个DataFrame所包含列的表,注意该表不能存在
Spark SQL和Hive的集成
需要先启动Hive,然后再进行下面的操作。
代码编写
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOpsimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.hive.HiveContext/** * 通过创建HiveContext来操作Hive中表的数据 * 数据源: * teacher_info.txt * name(String) height(double) * zhangsan,175 * lisi,180 * wangwu,175 * zhaoliu,195 * zhouqi,165 * weiba,185 * * create table teacher_info( * name string, * height double * ) row format delimited * fields terminated by ','; * * teacher_basic.txt * name(String) age(int) married(boolean) children(int) * zhangsan,23,false,0 * lisi,24,false,0 * wangwu,25,false,0 * zhaoliu,26,true,1 * zhouqi,27,true,2 * weiba,28,true,3 * * create table teacher_basic( * name string, * age int, * married boolean, * children int * ) row format delimited * fields terminated by ','; * * * 需求: *1.通过sparkSQL在hive中创建对应表,将数据加载到对应表 *2.执行sparkSQL作业,计算teacher_info和teacher_basic的关联信息,将结果存放在一张表teacher中 * * 在集群中执行hive操作的时候,需要以下配置: * 1、将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下 2、在$SPARK_HOME/conf/spark-env.sh中添加一条记录 export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar */object _01HiveContextOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf()// .setMaster("local[2]") .setAppName(_01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) //创建teacher_info表 hiveContext.sql("CREATE TABLE teacher_info(" + "name string, " + "height double) " + "ROW FORMAT DELIMITED " + "FIELDS TERMINATED BY ','") hiveContext.sql("CREATE TABLE teacher_basic(" + "name string, " + "age int, " + " married boolean, " + "children int) " + "ROW FORMAT DELIMITED " + "FIELDS TERMINATED BY ','") // 向表中加载数据 hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_info.txt' INTO TABLE teacher_info") hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_basic.txt' INTO TABLE teacher_basic") //第二步操作 计算两张表的关联数据 val joinDF = hiveContext.sql("SELECT " + "b.name, " + "b.age, " + "if(b.married, '已婚', '未婚') as married, " + "b.children, " + "i.height " + "FROM teacher_info i " + "INNER JOIN teacher_basic b ON i.name = b.name") joinDF.collect().foreach(println) joinDF.write.saveAsTable("teacher") sc.stop() }}
打包、上传与配置
打包后上传到集群环境中,然后针对Spark做如下配置:
在集群中执行hive操作的时候,需要以下配置: 1、将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下 2、在$SPARK_HOME/conf/spark-env.sh中添加一条记录 export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
提交spark作业
使用的spark提交作业的脚本如下:
[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh #export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop/home/uplooking/app/spark/bin/spark-submit \--class $2 \--master spark://uplooking02:7077 \--executor-memory 1G \--num-executors 1 \$1 \
执行如下命令:
./spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps
验证
可以在作业执行的输出结果有看到我们期望的输出,也可以直接在Hive中操作来进行验证:
hive> show tables;OKhpeoplepeoplet1teacherteacher_basicteacher_infoTime taken: 0.03 seconds, Fetched: 6 row(s)hive> select * from teacher;OKzhangsan 23 未婚 0 175.0lisi 24 未婚 0 180.0wangwu 25 未婚 0 175.0zhaoliu 26 已婚 1 195.0zhouqi 27 已婚 2 165.0weiba 28 已婚 3 185.0Time taken: 0.369 seconds, Fetched: 6 row(s)
Spark和ES的集成
需要确保ElasticSearch环境已经搭建好。
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2import org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.spark.sql._import org.elasticsearch.spark._/** * Spark和ES的集成操作 * 引入Spark和es的maven依赖 * elasticsearch-hadoop * 2.3.0 * 将account.json加载到es的索引库spark/account * 可以参考官方文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark.html */object _02SparkElasticSearchOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_02SparkElasticSearchOps.getClass().getSimpleName) .setMaster("local[2]") /** * Spark和es的集成配置 */ conf.set("es.index.auto.create", "true") conf.set("es.nodes", "uplooking01") conf.set("es.port", "9200") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc)// write2ES(sqlContext) readFromES(sc) sc.stop() } /** * 从es中读数据 * (使用sparkContext进行操作) */ def readFromES(sc:SparkContext): Unit = { val resources = "spark/account" // 索引库/类型 val jsonRDD = sc.esJsonRDD(resources) jsonRDD.foreach(println) } /** * 向es中写入数据 * (使用sqlContext进行操作) */ def write2ES(sqlContext:SQLContext): Unit = { val jsonDF = sqlContext.read.json("D:/data/spark/sql/account.json") val resources = "spark/account" // 索引库/类型 jsonDF.saveToEs(resources) }}
Spark SQL函数
概述(Spark 1.5.X ~ 1.6.X的内置函数)
使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果是返回一个Column对象,而DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建附件的业务逻辑而言是可以极大的减少不必须的时间消耗(基于上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的Spark 1.5.x开始提供了大量的内置函数,还有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acos、asin、atan
总体上而言内置函数包含了五大基本类型:
1、聚合函数,例如countDistinct、sumDistinct等;2、集合函数,例如sort_array、explode等3、日期、时间函数,例如hour、quarter、next_day4、数学函数,例如asin、atan、sqrt、tan、round等;5、开窗函数,例如rowNumber等6、字符串函数,concat、format_number、rexexp_extract7、其它函数,isNaN、sha、randn、callUDF以下为Hive中的知识内容,但是显然Spark SQL也有同样的概念UDF 用户自定义函数:User Definded Function 一路输入,一路输出 a--->A strlen("adbad")=5UDAF 用户自定义聚合函数:User Definded Aggregation Function 多路输入,一路输出 sum(a, b, c, d)---->汇总的结果表函数 UDTF:用户自定义表函数:User Definded Table Function 多路输入,多路输出 "hello you" "hello me" ---->转换操作,----->split("")---->Array[] ["hello, "you"]---> "hello" "you" ---->行列转换
一个基本的案例如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2import org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.SQLContext/** * SparkSQL 内置函数操作 */object _03SparkSQLFunctionOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_03SparkSQLFunctionOps.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val pdf = sqlContext.read.json("D:/data/spark/sql/people.json") pdf.show() pdf.registerTempTable("people") // 统计人数 sqlContext.sql("select count(1) from people").show() // 统计最小年龄 sqlContext.sql("select age, " + "max(age) as max_age, " + "min(age) as min_age, " + "avg(age) as avg_age, " + "count(age) as count " + "from people group by age order by age desc").show() sc.stop() }}
输出结果如下:
+---+------+-------+|age|height| name|+---+------+-------+| 10| 168.8|Michael|| 30| 168.8| Andy|| 19| 169.8| Justin|| 32| 188.8| Jack|| 10| 158.8| John|| 19| 179.8| Domu|| 13| 179.8| 袁帅|| 30| 175.8| 殷杰|| 19| 179.9| 孙瑞|+---+------+-------+18/05/09 17:53:23 INFO FileInputFormat: Total input paths to process : 1+---+|_c0|+---+| 9|+---+18/05/09 17:53:24 INFO FileInputFormat: Total input paths to process : 1+---+-------+-------+-------+-----+|age|max_age|min_age|avg_age|count|+---+-------+-------+-------+-----+| 32| 32| 32| 32.0| 1|| 30| 30| 30| 30.0| 2|| 19| 19| 19| 19.0| 3|| 13| 13| 13| 13.0| 1|| 10| 10| 10| 10.0| 2|+---+-------+-------+-------+-----+
Spark SQL开窗函数
1、Spark 1.5.x版本以后,在Spark SQL和DataFrame中引入了开窗函数,比如最经典的就是我们的row_number(),可以让我们实现分组取topn的逻辑。
2、做一个案例进行topn的取值(利用Spark的开窗函数),不知道同学们是否还有印象,我们之前在最早的时候,做过topn的计算,当时是非常麻烦的。但是现在用了Spark SQL之后,非常方便。
Spark SQL之UDF操作
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2import org.apache.log4j.{Level, Logger}import org.apache.spark.sql.types.{DataTypes, StructField, StructType}import org.apache.spark.sql.{Row, SQLContext}import org.apache.spark.{SparkConf, SparkContext}/** * SparkSQL 内置函数操作 */object _04SparkSQLFunctionOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_04SparkSQLFunctionOps.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * hive中的用户自定义函数UDF操作(即在SparkSQL中类比hive来进行操作,因为hive和SparkSQL都是交互式计算) * 1.创建一个普通的函数 * 2.注册(在SqlContext中注册) * 3.直接使用即可 * * 案例:创建一个获取字符串长度的udf */ // 1.创建一个普通的函数 def strLen(str:String):Int = str.length // 2.注册(在SqlContext中注册) sqlContext.udf.register[Int, String]("myStrLen", strLen) val list = List("Hello you", "Hello he", "Hello me") // 将RDD转换为DataFrameval rowRDD = sqlContext.sparkContext.parallelize(list).flatMap(_.split(" ")).map(word => { Row(word) }) val scheme = StructType(List( StructField("word", DataTypes.StringType, false) )) val df = sqlContext.createDataFrame(rowRDD, scheme) df.registerTempTable("test") // 3.直接使用即可 sqlContext.sql("select word, myStrLen(word) from test").show() sc.stop() }}
输出结果如下:
+-----+---+| word|_c1|+-----+---+|Hello| 5|| you| 3||Hello| 5|| he| 2||Hello| 5|| me| 2|+-----+---+
Spark SQL之wordcount操作
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2import org.apache.log4j.{Level, Logger}import org.apache.spark.sql.types.{DataTypes, StructField, StructType}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.{Row, SQLContext}/** * 这两部分都比较重要: * 1.使用SparkSQL完成单词统计操作 * 2.开窗函数使用 */object _05SparkSQLFunctionOps2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_05SparkSQLFunctionOps2.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val list = List("Hello you", "Hello he", "Hello me") // 将RDD转换为DataFrameval rowRDD = sqlContext.sparkContext.parallelize(list).map(line => { Row(line) }) val scheme = StructType(List( StructField("line", DataTypes.StringType, false) )) val df = sqlContext.createDataFrame(rowRDD, scheme) df.registerTempTable("test") df.show() // 执行wordcount val sql = "select t.word, count(1) as count " + "from " + "(select " + "explode(split(line, ' ')) as word " + "from test) as t " + "group by t.word order by count desc" sqlContext.sql(sql).show() sc.stop() }}
输出结果如下:
+---------+| line|+---------+|Hello you|| Hello he|| Hello me|+---------++-----+-----+| word|count|+-----+-----+|Hello| 3|| me| 1|| he| 1|| you| 1|+-----+-----+