spark-sql的进阶案例
发表于:2024-11-23 作者:热门IT资讯网编辑
编辑最后更新 2024年11月23日,(1)骨灰级案例--UDTF求wordcount数据格式:每一行都是字符串并且以空格分开。代码实现:object SparkSqlTest { def main(args: Array[Stri
(1)骨灰级案例--UDTF求wordcount
数据格式:
每一行都是字符串并且以空格分开。
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext val wordDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt").toDF("line") wordDF.createTempView("lines") val sql= """ |select t1.word,count(1) counts |from ( |select explode(split(line,'\\s+')) word |from lines) t1 |group by t1.word |order by counts """.stripMargin spark.sql(sql).show() }}
结果:
(2)窗口函数求topN
数据格式:
取每门课程中成绩最好的前三
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext val topnDF: DataFrame = sqlContext.read.json("C:\\z_data\\test_data\\score.json") topnDF.createTempView("student") val sql= """select |t1.course course, |t1.name name, |t1.score score |from ( |select |course, |name, |score, |row_number() over(partition by course order by score desc ) top |from student) t1 where t1.top<=3 """.stripMargin spark.sql(sql).show() }}
结果:
(3)SparkSQL去处理DataSkew数据倾斜的问题
思路: (使用两阶段的聚合)
- 找到发生数据倾斜的key
- 对发生倾斜的数据的key进行拆分
- 做局部聚合
- 去后缀
- 全局聚合
以上面的wordcount为例,找出相应的数据量比较大的单词
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //注册UDF sqlContext.udf.register[String,String,Integer]("add_prefix",add_prefix) sqlContext.udf.register[String,String]("remove_prefix",remove_prefix) //创建sparkContext对象 val sc: SparkContext = spark.sparkContext val lineRDD: RDD[String] = sc.textFile("C:\\z_data\\test_data\\ip.txt") //找出数据倾斜的单词 val wordsRDD: RDD[String] = lineRDD.flatMap(line => { line.split("\\s+") }) val sampleRDD: RDD[String] = wordsRDD.sample(false,0.2) val sortRDD: RDD[(String, Int)] = sampleRDD.map(word=>(word,1)).reduceByKey(_+_).sortBy(kv=>kv._2,false) val hot_word = sortRDD.take(1)(0)._1 val bs: Broadcast[String] = sc.broadcast(hot_word) import spark.implicits._ //将数据倾斜的key打标签 val lineDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt") val wordDF: Dataset[String] = lineDF.flatMap(row => { row.getAs[String](0).split("\\s+") }) //有数据倾斜的word val hotDS: Dataset[String] = wordDF.filter(row => { val hot_word = bs.value row.equals(hot_word) }) val hotDF: DataFrame = hotDS.toDF("word") hotDF.createTempView("hot_table") //没有数据倾斜的word val norDS: Dataset[String] = wordDF.filter(row => { val hot_word = bs.value !row.equals(hot_word) }) val norDF: DataFrame = norDS.toDF("word") norDF.createTempView("nor_table") var sql= """ |(select |t3.word, |sum(t3.counts) counts |from (select |remove_prefix(t2.newword) word, |t2.counts |from (select |t1.newword newword, |count(1) counts |from |(select |add_prefix(word,3) newword |from hot_table) t1 |group by t1.newword) t2) t3 |group by t3.word) |union |(select | word, | count(1) counts |from nor_table |group by word) """.stripMargin spark.sql(sql).show() } //自定义UDF加前缀 def add_prefix(word:String,range:Integer): String ={ val random=new Random() random.nextInt(range)+"_"+word } //自定义UDF去除后缀 def remove_prefix(word:String): String ={ word.substring(word.indexOf("_")+1) }}
结果: