How to read Druid data using JDBC driver with spark?

2.8k views Asked by At

How can I read data from Druid using spark and Avatica JDBC Driver? This is avatica JDBC document

Reading data from Druid using python and Jaydebeapi module, I succeed like below code.

$ python
import jaydebeapi

conn = jaydebeapi.connect("org.apache.calcite.avatica.remote.Driver",
                          "jdbc:avatica:remote:url=http://0.0.0.0:8082/druid/v2/sql/avatica/",
                          {"user": "druid", "password":"druid"},
                          "/root/avatica-1.17.0.jar",
       )
cur = conn.cursor()
cur.execute("SELECT * FROM INFORMATION_SCHEMA.TABLES")
cur.fetchall()

output is:

[('druid', 'druid', 'wikipedia', 'TABLE'),
('druid', 'INFORMATION_SCHEMA', 'COLUMNS', 'SYSTEM_TABLE'),
('druid', 'INFORMATION_SCHEMA', 'SCHEMATA', 'SYSTEM_TABLE'),
('druid', 'INFORMATION_SCHEMA', 'TABLES', 'SYSTEM_TABLE'),
('druid', 'sys', 'segments', 'SYSTEM_TABLE'),
('druid', 'sys', 'server_segments', 'SYSTEM_TABLE'),
('druid', 'sys', 'servers', 'SYSTEM_TABLE'),
('druid', 'sys', 'supervisors', 'SYSTEM_TABLE'),
('druid', 'sys', 'tasks', 'SYSTEM_TABLE')]  -> default tables

But I want to read using spark and JDBC.

I tried it but there is a problem using spark like below code.

$ pyspark --jars /root/avatica-1.17.0.jar

df = spark.read.format('jdbc') \
    .option('url', 'jdbc:avatica:remote:url=http://0.0.0.0:8082/druid/v2/sql/avatica/') \
    .option("dbtable", 'INFORMATION_SCHEMA.TABLES') \
    .option('user', 'druid') \
    .option('password', 'druid') \
    .option('driver', 'org.apache.calcite.avatica.remote.Driver') \
    .load()

output is:

Traceback (most recent call last):
  File "<stdin>", line 8, in <module>
  File "/root/spark-2.4.4-bin-hadoop2.7/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/root/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/root/spark-2.4.4-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/root/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2999.load.
: java.sql.SQLException: While closing connection
...
Caused by: java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "rpcMetadata" (class org.apache.calcite.avatica.remote.Service$CloseConnectionResponse), not marked as ignorable (0 known properties: ])
 at [Source: {"response":"closeConnection","rpcMetadata":{"response":"rpcMetadata","serverAddress":"172.18.0.7:8082"}}
; line: 1, column: 46]
...
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "rpcMetadata" (class org.apache.calcite.avatica.remote.Service$CloseConnectionResponse), not marked as ignorable (0 known properties: ])
 at [Source: {"response":"closeConnection","rpcMetadata":{"response":"rpcMetadata","serverAddress":"172.18.0.7:8082"}}
; line: 1, column: 46] 
...

Note:

2

There are 2 answers

0
Prashant Choudhary On

I tried with spark-shell:

./bin/spark-shell --driver-class-path avatica-1.17.0.jar --jars avatica-1.17.0.jar

val jdbcDF = spark.read.format("jdbc")
    .option("url", "jdbc:avatica:remote:url=http://0.0.0.0:8082/druid/v2/sql/avatica/")
    .option("dbtable", "INFORMATION_SCHEMA.TABLES")
    .option("user", "druid")
    .option("password", "druid")
    .load()
0
SEUNGFWANI On

I found another way to solve this problem. I used spark-druid-connector to connect druid with spark.

But I changed some codes like this to use this code for my environment.

This is my environment:

  • spark: 2.4.4
  • scala: 2.11.12
  • python: python 3.6.8
  • druid:
    • zookeeper: 3.5
    • druid: 0.17.0

However, it has a problem.

  • If you use spark-druid-connector at least once, all sql queries like spark.sql("select * from tmep_view") used from the following will be entered into this planner.
  • but, if you use dataframe's api like df.distinct().count(), then there are no problems. I didn't solve yet.