Ramandeep Singh Nanda
Published

Wed 24 January 2018

←Home

Writing Generic UDFs in Spark

Apache Spark offers the ability to write Generic UDFs. However, for an idiomatic implementation, there are a couple of things that one needs to keep in mind.

  1. You should return a subtype of Option because Spark treats None subtype automatically as null and is able to extract value from Some subtype.
  2. Your Generic UDFs should be able to handle Option or regular type as input. To accomplish this, use type matching in case of Option and recursively extract values. This scenario occurs, if your UDF is in turn wrapped by another UDF.

If these considerations are handled correctly, the implemented UDF has several important benefits:

  • It avoids the code duplication. And,
  • It handles nulls in a more idiomatic way.

Here is an example of a UDF that can be used to calculate the intervals between two time periods.

import java.time.{LocalDate, ZoneId}
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit

import scala.util.Try

def convertToDate[T](date: T): Option[LocalDate] = {
  if (date == null) return None
  date match {
    case dt: LocalDate => Some(dt)
    case dt: String =>
      if (dt.isEmpty) return None
      val retValue = Try {
        LocalDate.parse(dt, DateTimeFormatter.ISO_DATE)
      }.getOrElse(LocalDate.parse(dt, DateTimeFormatter.ISO_LOCAL_DATE_TIME))
      Some(retValue)
    case dt: java.sql.Date => Some(dt.toLocalDate)
    case dt: java.util.Date => Some(dt.toInstant.atZone(ZoneId.systemDefault()).toLocalDate)
    case dt: Option[_] => if (dt.isDefined) convertToDate(dt.get) else None
  }
}


def interval_between[V1, V2](fromDate: V1, toDate: V2, intType: String): Option[Long] = {
  def calculateInterval(fromDate: LocalDate, toDate: LocalDate, intType: String = "months"): Option[Long] = {
    val returnVal = intType match {
      case "decades" => ChronoUnit.DECADES.between(fromDate, toDate)
      case "years" => ChronoUnit.YEARS.between(fromDate, toDate)
      case "months" => ChronoUnit.MONTHS.between(fromDate, toDate)
      case "days" => ChronoUnit.DAYS.between(fromDate, toDate)
      case "hours" => ChronoUnit.HOURS.between(fromDate, toDate)
      case "minutes" => ChronoUnit.MINUTES.between(fromDate, toDate)
      case "seconds" => ChronoUnit.SECONDS.between(fromDate, toDate)
      case _ => throw new IllegalArgumentException(s"$intType is not supported")
    }
    Some(returnVal)
  }

  val fromDt = convertToDate(fromDate)
  val toDt = convertToDate(toDate)
  if (fromDt.isEmpty || toDt.isEmpty) {
    return None
  }
  calculateInterval(fromDt.get, toDt.get, intType.toLowerCase)
}

The above UDF takes care of the concerns mentioned earlier in the post. To use it, you simply have to register it as a UDF with SparkSession.

 ss.udf.register("interval_between", interval_between _)
Go Top
comments powered by Disqus