Glue Data Quality Check

74 views Asked by At

Give the sample code to create data quality glue job above business layer of datalake in scala language

Code in scala language that is .tf file for creating job and Data Quality job should take ruleset from dynamodb table for which it is passed as parameter. Ex: Create a Data Quality job for table that replaces the data quality check run in Tableau. I got sample code from aws

 import com.amazonaws.services.glue.GlueContext
    import com.amazonaws.services.glue.MappingSpec
    import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import com.amazonaws.services.glue.dq.EvaluateDataQuality

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    // Create DynamicFrame with data
    val Legislators_Area = glueContext.getCatalogSource(database="legislators", tableName="areas_json", transformationContext="S3bucket_node1").getDynamicFrame()

    // Define data quality ruleset
    val DQ_Ruleset = """
      Rules = [ColumnExists "id"]
    """

    // Evaluate data quality
    val DQ_Results = EvaluateDataQuality.apply(frame=Legislators_Area, ruleset=DQ_Ruleset, publishingOptions=JsonOptions("""{"dataQualityEvaluationContext": "Legislators_Area", "enableDataQualityMetrics": "true", "enableDataQualityResultsPublishing": "true"}"""))
    assert(DQ_Results.filter(_.getField("Outcome").contains("Failed")).count == 0, "Failing DQ rules for Legislators_Area caused the job to fail.")

    // Script generated for node Select Fields
    val SelectFields_Results = Legislators_Area.selectFields(paths=Seq("id", "name"), transformationContext="Legislators_Area")

    Job.commit()
  }
} 

here I should be able to take ruleset from dynamodb table and evaluate data quality ,This job should run above business layer of datalake.That is business dat ais passed as input for dataquality job.

0

There are 0 answers