Extract Year, Month, Day from Unix TimeStamp column in Rust DataFusion DataFrame?

272 views Asked by At

I have created a DataFusion DataFrame:

| asin       | vote | verified | unixReviewTime | reviewText      |
+------------+------+----------+----------------+-----------------+
| 0486427706 | 3    | true     | 1381017600     | good            |
| 0486427707 |      | false    | 1376006400     | excellent       |
| 0486427707 | 1    | true     | 1459814400     | Did not like it |
| 0486427708 | 4    | false    | 1376006400     |                 |
+------------+------+----------+----------------+-----------------+

I was trying to find the solution of following information from the API document, but could not figure it out:

  • Convert the unixReviewTime column into Rust Native timestamp
  • Extract the Year, Month and Day from the newly created column into separate columns

Here is how json datafile looks like:

{"asin": "0486427706", "vote": 3, "verified": true, "unixReviewTime": 1381017600, "reviewText": "good", "overall": 5.0}
{"asin": "0486427707", "vote": null, "verified": false, "unixReviewTime": 1376006400, "reviewText": "excellent", "overall": 5.0}
{"asin": "0486427707", "vote": 1, "verified": true, "unixReviewTime": 1459814400, "reviewText": "Did not like it", "overall": 2.0}
{"asin": "0486427708", "vote": 4, "verified": false, "unixReviewTime": 1376006400, "reviewText": null, "overall": 4.0}

It is very easy to do in pyspark as follows:

from PySpark.sql import functions as fn
from PySpark.sql.functions import col

main_df = (
    main_df
    .withColumn(
        'reviewed_at',
        fn.from_unixtime(col('unixReviewTime'))
    )
)

main_df = main_df.withColumn("reviewed_year", fn.year(col("reviewed_at")))
main_df = main_df.withColumn("reviewed_month", fn.month(col("reviewed_at")))
1

There are 1 answers

3
Andy Grove On BEST ANSWER
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::arrow::datatypes::{DataType, TimeUnit};

#[tokio::main]
async fn main() -> Result<()> {
    let mut ctx = SessionContext::new();
    let df = ctx
        .read_json("/tmp/data.json", NdJsonReadOptions::default())
        .await?
        .with_column(
            "unixReviewTimestamp",
            cast(
                col("unixReviewTime"),
                DataType::Timestamp(TimeUnit::Millisecond, None),
            ),
        )?
        .with_column(
            "reviewed_year",
            date_part(lit("year"), col("unixReviewTimestamp")),
        )?
        .with_column(
            "reviewed_month",
            date_part(lit("month"), col("unixReviewTimestamp")),
        )?;
    df.show().await?;
    Ok(())
}

fn cast(expr: Expr, data_type: DataType) -> Expr {
    Expr::Cast {
        expr: Box::new(expr),
        data_type,
    }
}

Produces:

+------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+
| asin       | vote | verified | unixReviewTime | reviewText      | overall | unixReviewTimestamp     | reviewed_year | reviewed_month |
+------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+
| 0486427706 | 3    | true     | 1381017600     | good            | 5       | 1970-01-16 23:36:57.600 | 1970          | 1              |
| 0486427707 |      | false    | 1376006400     | excellent       | 5       | 1970-01-16 22:13:26.400 | 1970          | 1              |
| 0486427707 | 1    | true     | 1459814400     | Did not like it | 2       | 1970-01-17 21:30:14.400 | 1970          | 1              |
| 0486427708 | 4    | false    | 1376006400     |                 | 4       | 1970-01-16 22:13:26.400 | 1970          | 1              |
+------------+------+----------+----------------+-----------------+---------+-------------------------+---------------+----------------+