Dynamic Transpose是Spark中的一个关键转换,因为它需要大量的迭代。本文将为您提供有关如何使用内存中运算符处理此复杂方案的清晰概念。
首先,让我们看看我们拥有的源数据:
idoc_number,订单ID,idoc_qualifier_org,idoc_org7738,2364,6,07738,2364,7,07738,2364,8,mystr17738,2364,12,mystr27739,2365,12,mystr37739,2365,7,mystr4我们还有idoc_qualifier_org 源数据记录中列的查找表 。由于查找表的大小会更小,我们可以预期它会在缓存中和驱动程序内存中。
预选赛,降序6,司7,分销渠道8,销售组织12,订单类型Dynamic Transpose操作的预期输出是:
idoc_number,order_id,Division,Distribution Channel,Sales org,Order Type7738,2364,0,0,mystr1,mystr27739,2365,空,mystr3,空,mystr4以下代码实际上将根据数据中的当前列转置数据。此代码是使用Spark中的Transpose Data的另一种方法。此代码严格使用Spark的复杂数据类型,并且还负责迭代的性能。
对象 DynamicTranspose {def dataValidator(map_val:Seq [ Map [ String,String ]],rule:String):String = {尝试 {val rule_array = 规则。拆分(“#!”)。toListval src_map = map_val。toList。压扁。toMapvar output_str = “”rule_array。foreach(f =>output_str = output_str + “!” + src_map。getOrElse(f,“#”))
return output_str。掉落(1)} catch {案例 t:Throwable => t。printStackTrace()。toString()返回 “0”。toString()}
}
def main(args:Array [ String ]):Unit = {
val spark = SparkSession。builder()。主人(“本地[*]”)。config(“spark.sql.warehouse.dir”,“<src dir>”)。getOrCreate()val data_df = spark。读。选项(“标题”,“真”)。csv(“<data path src>”)val lkp_df = spark。读。选项(“标题”,“真”)。csv(“查找路径源>”)进口 火花。暗示。_进口 组织。阿帕奇。火花。sql。功能。广播
val lkp_df_brdcast = broadcast(lkp_df)val result_df = data_df。加入(广播(lkp_df_brdcast),$ “idoc_qualifier_org” === $ “限定符”,“内部”)
val df1 = result_df。groupBy(col(“idoc_number”),col(“orderid”))。agg(collect_list(map($ “desc”,$ “idoc_org”))as “map”)进口 组织。阿帕奇。火花。sql。功能。UDF进口 组织。阿帕奇。火花。sql。功能。{点燃,最大,ROW_NUMBER}进口 火花。暗示。_进口 组织。阿帕奇。火花。sql。行val map_val = lkp_df。rdd。地图(行 => 行。的getString(1))。收集()。mkString(“#!”)火花。sparkContext。广播(map_val)VAL recdValidator = UDF(dataValidator _)var latest_df = df1。withColumn(“explode_out”,split(recdValidator(df1(“map”),lit(map_val)),“!”))。掉落(“地图”)val columns = map_val。拆分(“#!”)。toListlatest_df = 列。zipWithIndex。foldLeft(latest_df){(memodDF,专栏)=> {memodDF。withColumn(柱。_1,山口(“explode_out” )(柱。_2))}}。drop(“explode_out”)latest_df。show()}
}希望这可以帮助!










