I'm running cassandra and spark docker containers as follows in my compose.yaml :
spark:
image: docker.io/bitnami/spark:3.4
container_name: spark_master
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
ports:
- '8081:8081'
spark-worker:
image: docker.io/bitnami/spark:3.4
container_name: spark_worker
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
cassandra:
container_name: cassandra-container
image: cassandra:latest
ports:
- "7000:7000"
- "7001:7001"
- "7199:7199"
- "9042:9042"
- "9160:9160"
volumes:
- cassandra-data:/var/lib/cassandra
restart: always
I have some data ingested in cassandra, i'm trying to read it with with pyspark, I run a hello world program inside spark container and it's running successfully, I copied the pyspark files to the /opt/bitnami/spark/spark-script/ directory inside the container to be able to run them,
I also placed the spark-cassandra-connector_2.12-latest_version.jar in/opt/bitnami/spark/jars/ inside the container.
This is the python file where i'm trying to connect and read data from cassandra to pyspark connection file :
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
# Define your Cassandra connection settings
cassandra_host = "127.0.0.1" # Replace with the actual IP address of your Cassandra container
cassandra_port = "9042" # Default Cassandra port
cassandra_keyspace = "mykey_space" # Replace with your Cassandra keyspace
cassandra_table = "test_table" # Replace with your Cassandra table
# Create a Spark session
spark = SparkSession.builder \
.appName("CassandraConnectionTest") \
.config("spark.cassandra.connection.host", cassandra_host) \
.config("spark.cassandra.connection.port", cassandra_port) \
.getOrCreate()
# Test the Cassandra connection
try:
df = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table=cassandra_table,keyspace= cassandra_keyspace) \
.load()
df.show(5)
print("Cassandra connection test successful.")
except Exception as e:
print(f"Error: {str(e)}")
print("Cassandra connection test failed.")
# Stop the Spark session
spark.stop()
I run the following command to run this script :
docker exec -it spark_worker /bin/bash -c "spark-submit --jars /opt/bitnami/spark/jars/spark-cassandra-connector_2.12-latest_version.jar /opt/bitnami/spark/spark-script/cassandra_spark.py"
The error I get is as follows :
23/09/17 17:45:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/17 17:45:11 INFO SparkContext: Running Spark version 3.4.1
23/09/17 17:45:12 INFO ResourceUtils: ==============================================================
23/09/17 17:45:12 INFO ResourceUtils: No custom resources configured for spark.driver.
23/09/17 17:45:12 INFO ResourceUtils: ==============================================================
23/09/17 17:45:12 INFO SparkContext: Submitted application: CassandraConnectionTest
23/09/17 17:45:12 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/09/17 17:45:12 INFO ResourceProfile: Limiting resource is cpu
23/09/17 17:45:12 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/09/17 17:45:12 INFO SecurityManager: Changing view acls to: spark
23/09/17 17:45:12 INFO SecurityManager: Changing modify acls to: spark
23/09/17 17:45:12 INFO SecurityManager: Changing view acls groups to:
23/09/17 17:45:12 INFO SecurityManager: Changing modify acls groups to:
23/09/17 17:45:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark; groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY
23/09/17 17:45:12 INFO Utils: Successfully started service 'sparkDriver' on port 37035.
23/09/17 17:45:12 INFO SparkEnv: Registering MapOutputTracker
23/09/17 17:45:12 INFO SparkEnv: Registering BlockManagerMaster
23/09/17 17:45:12 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/09/17 17:45:12 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/09/17 17:45:12 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/09/17 17:45:12 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-34cb5ed0-030e-41f5-846f-9f43fbb76d7b
23/09/17 17:45:12 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
23/09/17 17:45:12 INFO SparkEnv: Registering OutputCommitCoordinator
23/09/17 17:45:13 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
23/09/17 17:45:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/09/17 17:45:13 INFO SparkContext: Added JAR file:///opt/bitnami/spark/jars/spark-cassandra-connector_2.12-latest_version.jar at spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar with timestamp 1694972711924
23/09/17 17:45:13 INFO Executor: Starting executor ID driver on host b168f719ac50
23/09/17 17:45:13 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
23/09/17 17:45:13 INFO Executor: Fetching spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar with timestamp 1694972711924
23/09/17 17:45:13 INFO TransportClientFactory: Successfully created connection to b168f719ac50/172.28.0.2:37035 after 46 ms (0 ms spent in bootstraps)
23/09/17 17:45:13 INFO Utils: Fetching spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar to /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/userFiles-2e5335c1-140a-4c90-a340-fdc663636446/fetchFileTemp7263685075750385356.tmp
23/09/17 17:45:13 INFO Executor: Adding file:/tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/userFiles-2e5335c1-140a-4c90-a340-fdc663636446/spark-cassandra-connector_2.12-latest_version.jar to class loader
23/09/17 17:45:13 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45859.
23/09/17 17:45:13 INFO NettyBlockTransferService: Server created on b168f719ac50:45859
23/09/17 17:45:13 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/09/17 17:45:13 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManagerMasterEndpoint: Registering block manager b168f719ac50:45859 with 434.4 MiB RAM, BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:14 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/09/17 17:45:14 INFO SharedState: Warehouse path is 'file:/opt/bitnami/spark/spark-warehouse'.
Error: An error occurred while calling o34.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.apache.spark.sql.cassandra. Please find packages at `https://spark.apache.org/third-party-projects.html`.
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:738)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
... 15 more
Cassandra connection test failed.
23/09/17 17:45:16 INFO SparkContext: SparkContext is stopping with exitCode 0.
23/09/17 17:45:16 INFO SparkUI: Stopped Spark web UI at http://b168f719ac50:4040
23/09/17 17:45:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/09/17 17:45:16 INFO MemoryStore: MemoryStore cleared
23/09/17 17:45:16 INFO BlockManager: BlockManager stopped
23/09/17 17:45:16 INFO BlockManagerMaster: BlockManagerMaster stopped
23/09/17 17:45:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/09/17 17:45:16 INFO SparkContext: Successfully stopped SparkContext
23/09/17 17:45:16 INFO ShutdownHookManager: Shutdown hook called
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/pyspark-474abab0-e74c-44c9-b6b8-fd378e8ea964
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-d094d4f5-8b21-4b18-84b5-df94e2381346
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634
I'm always making sure that the containers are running on the same network, what am I doing wrong here? any help would be highly appreciated.
docker network inspect my_network gives the following result :
[
{
"Name": "project_default",
"Id": "c28118c6ba0511ec9f54a97a6d7b17113a75995d4bfed6d18fc80cf3616acb03",
"Created": "2023-09-18T12:19:08.729378699Z",
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": [
{
"Subnet": "172.20.0.0/16",
"Gateway": "172.20.0.1"
}
]
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {
"89396ef426b166e4d293011ef2c8c8ff60007d2baeba2734ef2e91a0cf7f60ec": {
"Name": "cassandra-container",
"EndpointID": "5a3804e47d659bcdf19a44477e85fc7c250b52b0375d342984c7d0033ff1b6cc",
"MacAddress": "02:42:ac:14:00:04",
"IPv4Address": "172.20.0.4/16",
"IPv6Address": ""
},
"95cdbc38d6717b0318e81e33dfe990dc5da461d22f60976d132559d568c9365f": {
"Name": "spark_worker",
"EndpointID": "cc6b69c4c2f2de02965efc78a53dc92d2f06e4e2a0d1e9dd29fb161308032a26",
"MacAddress": "02:42:ac:14:00:03",
"IPv4Address": "172.20.0.3/16",
"IPv6Address": ""
},
"e4864bdc0ff72f0185aceaf6f4844ef75ec45f6673cc776529eb8ca2f67943c6": {
"Name": "spark_master",
"EndpointID": "83c4b290a2730cec39849189b326513b33b70042832dacf9c5b47b2d92f90351",
"MacAddress": "02:42:ac:14:00:02",
"IPv4Address": "172.20.0.2/16",
"IPv6Address": ""
}
},
"Options": {},
"Labels": {
"com.docker.compose.network": "default",
"com.docker.compose.project": "project",
"com.docker.compose.version": "2.18.1"
}
}
]
The problem is in this line:
Because the
127.0.0.1is the address of the local "machine", and in this case it will be the driver or worker Docker instances. The best is to use the name of Cassandra docker instance, in your case:See docker-compose documentation for more details about networking.