Ramandeep Singh Nanda
Published

Thu 12 April 2018

←Home

Removing Projection Column Ambiguity in Spark

Column ambiguity is quite common when you join two tables. Now this poses a unnecessary hassle when you want to select all the columns from both the tables whilst discarding the duplicate columns. The aforementioned problem is difficult to handle especially, if you have wide tables, where you would want to avoid typing the column names.

There are a couple of programmatic solutions to the problem, both essentially do the same thing, but achieve the results differently.

  • Either execute sql traditionally using ss.sql(query), and then manually transform the Dataset by converting to RDD, dropping rows and duplicate column names, or
  • Implement the query execution in a similar way as spark does, drop the duplicate column names and then create Dataset this avoid unnecessary conversion and creation of Dataset until duplicates are dropped.

Note: Either of these approaches should be avoided if they can be, as immediate evaluation of query limits optimizations. In this case, however having duplicate column names anyways renders the Dataset useless for further queries or saving the Dataset.

Solution:

def sqlDropDuplicateColumns(query: String, ss: SparkSession): Dataset[Row] = {
  val logicalPlan = ss.sessionState.sqlParser.parsePlan(sqlText = query)
  val qe = ss.sessionState.executePlan(logicalPlan)
  //Assert plan is valid
  qe.assertAnalyzed()
  val ep = qe.executedPlan
  //Drop duplicate column names
  val schema = StructType(ep.schema.toSet.toArray)
  val rows = ep.executeCollectPublic().map(r => {
    val lb = new ListBuffer[Any]
    schema.map(sf => lb += r.getAs(sf.name))
    Row(lb: _*)
  })
  ss.createDataFrame(ss.sparkContext.parallelize(rows), schema)
}
Go Top
comments powered by Disqus