spark union 特别注意
发表于:2024-11-21 作者:热门IT资讯网编辑
编辑最后更新 2024年11月21日,今天遇到一个很诡异的问题。表Auseridhousecoderesctimeu1code111301表Buseridhousecoderesctimeu2code201302表Cuseridnamet
今天遇到一个很诡异的问题。
表A
userid | housecode | res | ctime |
---|---|---|---|
u1 | code1 | 1 | 1301 |
表B
userid | housecode | res | ctime |
---|---|---|---|
u2 | code2 | 0 | 1302 |
表C
userid | name | type | time |
---|---|---|---|
u1 | 大海 | 0 | 1303 |
然后对表A进行处理操作
表A.createOrReplaceTempView("t1");
JavaRDD
t1= s.createDataFrame(rdd, HistoryModelExt.class);
然后查看t1, t1.show()
u1 | code1 | 1 | 1301 |
---|---|---|---|
.. | .. | .. | .. |
数据还在,然后 B union A 然后 join C(通过userid), 理论上应该是有结果的,感觉就像1+1=2 这么肯定,但是还真没有数据,非常诧异。
刚开始以为是自己程序哪里有问题,苦苦寻找,发现一切正常, 最后回到 union这个方法上。
为了看清楚前因后果, 我把B union A的数据打印了出来,发现了一个奇怪的事情
userid | housecode | res | ctime |
---|---|---|---|
u2 | code2 | 0 | 1302 |
1301 | code1 | 1 | u1 |
当时一下子就明白为什么join 没有数据了, A的schema已经与B不一致了。
原来 union函数并不是按照列名合并,而是按照位置合并。
但是在JavaRDD
查看源代码
/** * Applies a schema to an RDD of Java Beans. * * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, * SELECT * queries will return the columns in an undefined order. * * @since 2.0.0 */ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = { val attributeSeq: Seq[AttributeReference] = getSchema(beanClass) val className = beanClass.getNameval rowRdd = rdd.mapPartitions { iter => // BeanInfo is not serializable so we must rediscover it remotely for each partition. SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq) } Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd.setName(rdd.name))(self)) }
看注释,fields的顺序是不保证的, 原来如此。
这样你在union前乖乖的执行
t1.select("userId","houseCode","res","ctime");
这样顺序就又恢复了,大数据排查问题特别麻烦,感觉是一个很大的坑,希望能帮到后来人。