Read FASTQ file into a AWS Glue Job Script

763 views Asked by At

I need to read FASTQ file into AWS Glue Job Script but I'am getting this error:

Traceback (most recent call last): File "/opt/amazon/bin/runscript.py", line 59, in runpy.run_path(script, run_name='main') File "/usr/lib64/python3.7/runpy.py", line 261, in run_path code, fname = _get_code_from_file(run_name, path_name) File "/usr/lib64/python3.7/runpy.py", line 236, in _get_code_from_file code = compile(f.read(), fname, 'exec') File "/tmp/test20200930", line 24 datasource0 = spark.createDataset(sc.textFile("s3://sample-genes-data/fastq/S_Sonnei_short_reads_1.fastq").sliding(4, 4).map { ^ SyntaxError: invalid syntax During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/amazon/bin/runscript.py", line 92, in while "runpy.py" in new_stack.tb_frame.f_code.co_filename: AttributeError: 'NoneType' object has no attribute 'tb_frame'

This is my code:

import org.apache.spark.mllib.rdd.RDDFunctions._

datasource0 = spark.createDataset(sc.textFile("s3://sample-genes-data/fastq/S_Sonnei_short_reads_1.fastq").sliding(4, 4).map {
  case Array(id, seq, _, qual) => (id, seq, qual)
 }).toDF("identifier", "sequence", "quality")
datasource1 = DynamicFrame.fromDF(datasource0, glueContext, "nullv")

I followed this link: Read FASTQ file into a Spark dataframe

1

There are 1 answers

3
Prabhakar Reddy On

I was able to run the code by wrapping it inside a GlueApp object. You can use below code by replacing the S3 path of yours.

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.mllib.rdd.RDDFunctions._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession
    import sparkSession.implicits._
    val datasource0 = sparkSession.createDataset(spark.textFile("s3://<s3path>").sliding(4, 4).map {
  case Array(id, seq, _, qual) => (id, seq, qual)
 }).toDF("identifier", "sequence", "quality")
   val datasource1 = DynamicFrame(datasource0, glueContext)
   datasource1.show()
   datasource1.printSchema()
   Job.commit()
  }
}

Passed Input :

@seq1
AGTCAGTCGAC
+
?@@FFBFFDDH
@seq2
CCAGCGTCTCG
+
?88ADA?BDF8

Output:

{"identifier": "@seq1", "sequence": "AGTCAGTCGAC", "quality": "?@@FFBFFDDH"}
{"identifier": "@seq2", "sequence": "CCAGCGTCTCG", "quality": "?88ADA?BDF8"}