In Qubole notebook I am trying to get certain string from API response. It seems to be working just fine for sample data but fails when I use the full set. Spark version: 2.3.1; Scala version: 2.11; scalaj-http version: 2.4.2
import scalaj.http._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
// Sample data to test the code
val locationIds = Seq(
("178277"),("178278"),("178279")).toDF("search_destination_id")
def getResponse(request_url: String): String = Http(request_url).asString.body
def getGeoType(col: Column): Column = {
udf { gaiaId: String =>
val request_url = "https://...some_url..." + gaiaId + "...some_parameters"
val response = getResponse(request_url)
val pattern = "(?<=type\":\").*?(?=\")".r
pattern.findFirstIn(response).getOrElse("no match")
}.apply(col)
}
val result = locationIds.withColumn("geo_type", getGeoType($"search_destination_id"))
result.show
Example of expected results which I get for sample data:
+---------------------+-------------------+
|search_destination_id| geo_type|
+---------------------+-------------------+
| 178277|multi_city_vicinity|
| 178278|multi_city_vicinity|
| 178279|multi_city_vicinity|
+---------------------+-------------------+
Error message I get when I try to process all search_destination_id I have to process:
at java.lang.Class.getSimpleBinaryName(Class.java:1450)
at java.lang.Class.getSimpleName(Class.java:1309)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage$lzycompute(ScalaUDF.scala:1048)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage(ScalaUDF.scala:1047)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.doGenCode(ScalaUDF.scala:1000)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:85)
at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:206)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
at org.apache.spark.sql.execution.BaseLimitExec$class.doProduce(limit.scala:70)
at org.apache.spark.sql.execution.LocalLimitExec.doProduce(limit.scala:97)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
at org.apache.spark.sql.execution.LocalLimitExec.produce(limit.scala:97)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3280)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3261)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:78)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3260)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2705)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:256)
at org.apache.spark.sql.Dataset.show(Dataset.scala:725)
at org.apache.spark.sql.Dataset.show(Dataset.scala:684)
at org.apache.spark.sql.Dataset.show(Dataset.scala:693)
... 96 elided
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -21
at java.lang.String.substring(String.java:1931)
at java.lang.Class.getSimpleBinaryName(Class.java:1448)
... 174 more
Please advise what is causing this error and how I can avoid it.