Cleaning up python interactions

- More forgiving timestamp parsing
- No more barfs when trying to use use_deltas=True with geometry types
parent 27fea0af
......@@ -48,7 +48,7 @@ object SparkPrimitive
}
val DateString = "([0-9]{4})-([0-9]{2})-([0-9]{2})".r
val TimestampString = "([0-9]{4})-([0-9]{2})-([0-9]{2}) ([0-9]{2}):([0-9]{2}):([0-9.]+)".r
val TimestampString = "([0-9]{4})-([0-9]{2})-([0-9]{2})[ T]([0-9]{2}):([0-9]{2}):([0-9.]+)".r
def decodeDate(date: String): Date =
date match {
......
......@@ -3,7 +3,7 @@ package org.mimirdb.vizual
import play.api.libs.json._
import org.apache.spark.sql.{ DataFrame, Column }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ StructField, StringType }
import org.apache.spark.sql.types.{ StructField, StringType, DataType }
import org.apache.spark.sql.catalyst.expressions.{ Expression, Cast, Literal }
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import com.typesafe.scalalogging.LazyLogging
......@@ -12,6 +12,7 @@ import org.mimirdb.rowids.AnnotateWithRowIds
import org.mimirdb.rowids.AnnotateWithSequenceNumber
import org.mimirdb.caveats.implicits._
import org.mimirdb.spark.SparkPrimitive
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
object ExecOnSpark
extends LazyLogging
......@@ -53,8 +54,10 @@ object ExecOnSpark
input.schema
.zip(values.getOrElse { input.columns.toSeq.map { _ => JsNull } })
.map { case (field, value) =>
lit(SparkPrimitive.decode(value, field.dataType))
.as(field.name)
new Column(Literal(
userFacingToInternalType(SparkPrimitive.decode(value, field.dataType), field.dataType),
field.dataType
)).as(field.name)
}
if(position < 0){
input.union(
......@@ -247,11 +250,15 @@ object ExecOnSpark
// since we need to figure out how to cast it. Start with the column's native data
// type.
// println(s"$value -> ${targetColumn.dataType}")
val update = lit(SparkPrimitive.decode(
value,
targetColumn.dataType,
castStrings = true
)).expr.eval()
val decoded =
SparkPrimitive.decode(
value,
targetColumn.dataType,
castStrings = true
)
// SparkPrimitive gives us user-facing spark objects. Convert to
// internal types if necessary
val update = userFacingToInternalType(decoded, targetColumn.dataType)
// If the updated value can't be interpreted in the column's native data type,
// make everything a string. (eventually, maybe we try some inference to figure
......@@ -305,4 +312,19 @@ object ExecOnSpark
}
}
}
/**
* Spark uses different internal and user-facing formats. Here, we want
* to use user-facing formats, but when executing it's necessary to convert
* literals to the internal format.
*/
def userFacingToInternalType(value: Any, dataType: DataType): Any =
dataType match {
// change this to UserDefinedType once we upgrade to spark 3.2
case GeometryUDT => GeometryUDT.serialize(value.asInstanceOf[org.locationtech.jts.geom.Geometry])
// most native types can be converted by evaluating the literal expression
case _ => lit(value).expr.eval()
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment