java.io.FileNotFoundException: Not found cos://mybucket.myservicename/checkpoint/offsets

539 views Asked by At

I'm trying to use Spark Structured Streaming 2.3 to read data from Kafka (IBM Message Hub) and save it into IBM Cloud Object Storage on a 1.1 IBM Analytics Engine Cluster.

After creating the cluster, ssh into it:

$ ssh [email protected]

Create a jaas.conf file required for spark to talk to Message Hub:

$ cat << EOF > jaas.conf
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="kafka"
    username="<<MY_MESSAGEHUB_USERNAME>>"
    password="<<MY_MESSAGEHUB_PASSWORD>>";
};
EOF

This will have created a file jaas.conf in the /home/wce/clsadmin directory on the cluster.

Create a utility script to start the spark shell (for now we just have one executor):

$ cat << EOF > start_spark.sh
spark-shell --master local[1] \
       --files jaas.conf \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
       --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
       --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
       --num-executors 1 --executor-cores 1 
EOF
$ chmod +x start_spark.sh

Start the spark session using the utility script:

$ ./start_spark.sh

Now inside the spark shell, read the Kafka (Message Hub) stream. Ensure you change the kafka.bootstrap.servers to match your service credentials:

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("kafka.security.protocol", "SASL_SSL").
                option("kafka.sasl.mechanism", "PLAIN").
                option("kafka.ssl.protocol", "TLSv1.2").
                option("kafka.ssl.enabled.protocols", "TLSv1.2").
                load()

We can test that our connection has worked ok:

df.writeStream.format("console").start()

After a while you should see some data printed to the console, e.g.

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+--------------------+--------------------+-----------------+---------+------+--------------------+-------------+
|                 key|               value|            topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+-----------------+---------+------+--------------------+-------------+
|[35 34 30 33 36 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84874|2018-08-22 15:42:...|            0|
|[35 34 30 33 36 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84875|2018-08-22 15:42:...|            0|
|[35 34 30 38 33 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84876|2018-08-22 15:42:...|            0|
...

Setup the spark session so that it can access the COS instance:

val accessKey = "MY_COS_ACCESS_KEY"
val secretKey = "MY_COS_SECRET_KEY"
val bucketName = "streamingdata"

// arbitrary name for refering to the cos settings from this code
val serviceName = "myservicename"

sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.access.key", accessKey)
sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.secret.key", secretKey)
sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.endpoint", "s3.eu-geo.objectstorage.service.networklayer.com")

We can test COS is setup by writing a dummy file:

import spark.implicits._

val data = sc.parallelize(Array(1,2,3,4,5))
data.toDF.write.format("csv").save(s"cos://${bucketName}.${serviceName}/data.txt")

spark.read.csv(s"cos://${bucketName}.${serviceName}/data.txt").collect()

The above test should output something like the following if reading and writing to COS was successful:

res7: Array[org.apache.spark.sql.Row] = Array([1], [2], [3], [4], [5])

Now try to write the streaming dataframe to COS:

df.
  writeStream.
  format("parquet").
  option("checkpointLocation", s"cos://${bucketName}.${serviceName}/checkpoint").
  option("path",               s"cos://${bucketName}.${serviceName}/data").
  start()

For me, this fails with:

scala> 18/08/22 15:43:06 WARN COSAPIClient: file status checkpoint/offsets returned 404
18/08/22 15:43:06 ERROR MicroBatchExecution: Query [id = 78c8c4af-f21d-457d-b5a7-56559e180634, runId = 50e8759e-0293-4fab-9b73-dd4811423b37] terminated with error
java.io.FileNotFoundException: Not found cos://streamingdata.myservicename/checkpoint/offsets
    at com.ibm.stocator.fs.cos.COSAPIClient.getFileStatus(COSAPIClient.java:628)
    at com.ibm.stocator.fs.ObjectStoreFileSystem.getFileStatus(ObjectStoreFileSystem.java:486)
    at com.ibm.stocator.fs.ObjectStoreFileSystem.listStatus(ObjectStoreFileSystem.java:360)
    at com.ibm.stocator.fs.ObjectStoreFileSystem.listStatus(ObjectStoreFileSystem.java:336)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileSystemManager.list(HDFSMetadataLog.scala:412)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:180)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Is this an issue with stocator, or Spark Structured Streaming?

1

There are 1 answers

0
Chris Snow On BEST ANSWER

Changing to the S3AFileSystem appears to have resolved the issue:

sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sc.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
sc.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-geo.objectstorage.service.networklayer.com")

val s3Url = s"s3a://${bucketName}/"

...

df.
  writeStream.
  format("parquet").
  option("checkpointLocation", s"${s3Url}/checkpoint").
  option("path",               s"${s3Url}/data").
  start()

It looks as though this issue is with the stocator driver.


UPDATE 23 Aug 2018: This issue was fixed in Stocator v1.0.24, but stocator on IBM Analytics Engine hasn't yet been updated to this version.