Combining data from JSON and CSV files using Spark Core in Python

123 views Asked by At

Trying to code a Python script that takes a JSON file and a number of CSV files from a Google Drive file, and analyzes and manipulates its data using ONLY Spark Core.

The function of this code is to create tuples with data from both JSON and CSV files. Both files share one item of information in common, this being the first item from each file (json and csv) which corresponds to a string. In the previous example, this item corresponds to object "1111". This item is a "code" (ex: 1111, 123, 12345...), present in each line of the csv files, associated to an "item1" in the json file. There may be many lines in the csv files under the same "code", so the script processes this information as you'll see ahead.

Data in the json file comes in the format:

[{"item1":"1111","item2":"aaaaaa","item3":"bbbbbbb"},{"item1":"123",...},...

and csv:

1111;SDS111;99.999;2.222
123;SDS222;333.111;4.111
...

The printed tuples would be a combination of data from the json and csv files united by this "code". The goal is to write something that, for each element in json file identifies the "code" and if the "code" is equal to the first item in the csv file prints out a tuple with data from both files, something like:

('aaaaaa', 'bbbbbbb', '1111', 'SDS111', (99.999, 2.222))
('ccccccccc', 'ddddd', '123', 'SDS222', (333.111, 4.111))
...

For the time being, the code that I have manages to read the json file and create tuples from it, and process the data stored in csv:

import json
import pyspark

sc = pyspark.SparkContext('local[*]')
try:
  with open("/content/drive/../file.json") as f:
    data = json.load(f)
    rdd = sc.parallelize(data)
    json_data = rdd.map(lambda item: (item["item1"], item["item2"], item["item3"]))

  lines = sc.textFile('/content/drive/../file*.csv', 5) \
          .map(lambda line: line.strip()) \
          .filter(lambda line: len(line.split(';')) == 4)

  csv_data = lines.map(lambda line: line.split(';')) \
          .map(lambda values: (values[0], float(values[2]), float(values[3]))) \
          .map(lambda kv: (kv[0], (kv[1], kv[2]))) \
          .reduceByKey(lambda a, b: (max(a[0] + b[0]), max(a[1] + b[1]))) \
          .map(lambda kv: (kv[0], (kv[1][1], kv[1][2]))) \
          .sortByKey()

  #combination part???

  sc.stop()
except Exception as e: 
  print(e)
  sc.stop()

I'm really struggling with this part of the script, and would very much appreciate some help! Feel free to ask me for more info if needed

1

There are 1 answers

0
CRAFTY DBA On

Why use low level commands. No one uses rdd calls unless you have to. I suggest using PySpark instead. The solution is wicked simple.

This code is in Azure Databricks. Write sample json file.

#
#  1 - Create sample json file
#

# some text
data = """
[
{"item1":"111","item2":"aaaaaa","item3":"bbbbbbb"},
{"item1":"123","item2":"cccccc","item3":"ddddddd"}
]
"""

# write file
path = "/tmp/stackoverflow.json"
dbutils.fs.put(path, data, True)

Create sample csv file.

#
#  2 - Create sample csv file
#

# some text
data = """
111;SDS111;99.999;2.222
123;SDS222;333.111;4.111
"""


# write file
path = "/tmp/stackoverflow.csv"
dbutils.fs.put(path, data, True)

Read one or more csv files and create a temporary hive view called "tmp_csv_data".

#
#  3 - Read csv data
#

# file location
path = "/tmp/*.csv"

# make dataframe
df1 = spark.read.format("csv") \
  .option("header", "false") \
  .option("sep", ";") \
  .schema("item1 string, label1 string, value1 float, value2 float") \
  .load(path)

# make temp hive view
df1.createOrReplaceTempView("tmp_csv_data")

Read one or more json files and create a temporary hive view called "tmp_json_data".

#
#  4 - Read json data
#

# file location
path = "/tmp/*.json"

# make dataframe
df1 = spark.read.format("json") \
  .schema("item1 string, item2 string, item3 string") \
  .load(path)

# make temp hive view
df1.createOrReplaceTempView("tmp_json_data")

Let's take a quick look at the csv view.

enter image description here

Let's take a quick look at the json view. Please note, the malformed JSON file creates two blank records.

enter image description here

The following images shows the two datasets in a combined dataframe. Just use spark.write() to create the final file.

enter image description here

This code is straight forward and leverages the ANSI SQL skills that most people already have.