Spark Timestamp issue

37 views Asked by At

Using spark + hive and connecting them with a JDBC URL from my application, in my mongo document, there is a column with a datatype of timestamp. when I try to connect using spark JDBC catalog, I get the following response along with an error:

{
  _id: '10006546',
  name: 'Ribeira Charming Duplex',
  last_scraped: ISODate('2019-02-16T05:00:00.000Z')
}

stacktrace:-

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 38, space), StringType), true, false, true) AS space#778
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 39, summary), StringType), true, false, true) AS summary#779
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 40, transit), StringType), true, false, true) AS transit#780
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else CheckOverflow(staticinvoke(class org.apache.spark.sql.types.Decimal$, DecimalType(6,2), fromDecimal, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 41, weekly_price), DecimalType(6,2)), true, false, true), DecimalType(6,2), true) AS weekly_price#781
    at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)


Caused by: java.lang.RuntimeException: java.sql.Timestamp is not a valid external type for schema of timestamp
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ValidateExternalType_3$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_9$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_13$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_18$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)

I tried following solution :-

new MongoTable(structType, MongoConfig.readConfig(properties));

    private static StructType convertTimestampToString(StructType inferredSchema) {
        // Create a new StructType by copying fields with modifications
        StructType customSchema = new StructType();
        // Iterate over each field and convert TimestampType to StringType
        for (StructField field : inferredSchema.fields()) {
            if (field.dataType() instanceof TimestampType) {
                customSchema = customSchema.add(field.name(), DataTypes.StringType);
            } else {
                customSchema = customSchema.add(field);
            }
        }

        return customSchema;
    }

But it does not yield the expected result. For an independent application where I am trying to fetch a mongo document only using Spark JDBC catalog I am able to retrieve this document.

0

There are 0 answers