热门IT资讯网

第17课:RDD案例(join、cogroup等实战)

发表于:2024-11-24 作者:热门IT资讯网编辑
编辑最后更新 2024年11月24日,本节课通过代码实战演示RDD中最重要的两个算子,join和cogroupjoin算子代码实战://通过代码演示join算子val conf = new SparkConf().setAppName("

本节课通过代码实战演示RDD中最重要的两个算子,join和cogroup


join算子代码实战:

//通过代码演示join算子
val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)


val rdd3 = rdd1.join(rdd2)
rdd3.collect().foreach(println)


运行结果:

(1,(Spark,100))

(3,(Tachyon,90))

(2,(Hadoop,70))


cogroup算子代码实战:

首先通过java的方式编写:

SparkConf conf = new SparkConf().setMaster("local").setAppName("Cogroup");

JavaSparkContext sc = new JavaSparkContext(conf);


List> nameList = Arrays.asList(new Tuple2(1, "Spark"),

new Tuple2(2, "Tachyon"), new Tuple2(3, "Hadoop"));

List> ScoreList = Arrays.asList(new Tuple2(1, 100),

new Tuple2(2, 95), new Tuple2(3, 80),

new Tuple2(1, 80), new Tuple2(2, 110),

new Tuple2(2, 90));


JavaPairRDD names = sc.parallelizePairs(nameList);

JavaPairRDD scores = sc.parallelizePairs(ScoreList);


JavaPairRDD, Iterable>> nameAndScores = names.cogroup(scores);

nameAndScores.foreach(new VoidFunction, Iterable>>>() {

public void call(Tuple2, Iterable>> t) throws Exception {

System.out.println("ID:" + t._1);

System.out.println("Name:" + t._2._1);

System.out.println("Score:" + t._2._2);

}

});

sc.close();


运行结果:

ID:1

Name:[Spark]

Score:[100, 80]

ID:3

Name:[Hadoop]

Score:[80]

ID:2

Name:[Tachyon]

Score:[95, 110, 90]


通过Scala的方式:

val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90), Tuple2(1, 95), Tuple2(2, 65), Tuple2(1, 110))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)

val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect().foreach(println)
sc.stop()


运行结果:

(1,(CompactBuffer(Spark),CompactBuffer(100, 95, 110)))

(3,(CompactBuffer(Tachyon),CompactBuffer(90)))

(2,(CompactBuffer(Hadoop),CompactBuffer(70, 65)))


备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

0