Trying to code a Python script that takes a JSON file and a number of CSV files from a Google Drive file, and analyzes and manipulates its data using ONLY Spark Core.
The function of this code is to create tuples with data from both JSON and CSV files. Both files share one item of information in common, this being the first item from each file (json and csv) which corresponds to a string. In the previous example, this item corresponds to object "1111". This item is a "code" (ex: 1111, 123, 12345...), present in each line of the csv files, associated to an "item1" in the json file. There may be many lines in the csv files under the same "code", so the script processes this information as you'll see ahead.
Data in the json file comes in the format:
[{"item1":"1111","item2":"aaaaaa","item3":"bbbbbbb"},{"item1":"123",...},...
and csv:
1111;SDS111;99.999;2.222
123;SDS222;333.111;4.111
...
The printed tuples would be a combination of data from the json and csv files united by this "code". The goal is to write something that, for each element in json file identifies the "code" and if the "code" is equal to the first item in the csv file prints out a tuple with data from both files, something like:
('aaaaaa', 'bbbbbbb', '1111', 'SDS111', (99.999, 2.222))
('ccccccccc', 'ddddd', '123', 'SDS222', (333.111, 4.111))
...
For the time being, the code that I have manages to read the json file and create tuples from it, and process the data stored in csv:
import json
import pyspark
sc = pyspark.SparkContext('local[*]')
try:
with open("/content/drive/../file.json") as f:
data = json.load(f)
rdd = sc.parallelize(data)
json_data = rdd.map(lambda item: (item["item1"], item["item2"], item["item3"]))
lines = sc.textFile('/content/drive/../file*.csv', 5) \
.map(lambda line: line.strip()) \
.filter(lambda line: len(line.split(';')) == 4)
csv_data = lines.map(lambda line: line.split(';')) \
.map(lambda values: (values[0], float(values[2]), float(values[3]))) \
.map(lambda kv: (kv[0], (kv[1], kv[2]))) \
.reduceByKey(lambda a, b: (max(a[0] + b[0]), max(a[1] + b[1]))) \
.map(lambda kv: (kv[0], (kv[1][1], kv[1][2]))) \
.sortByKey()
#combination part???
sc.stop()
except Exception as e:
print(e)
sc.stop()
I'm really struggling with this part of the script, and would very much appreciate some help! Feel free to ask me for more info if needed
Why use low level commands. No one uses rdd calls unless you have to. I suggest using PySpark instead. The solution is wicked simple.
This code is in Azure Databricks. Write sample json file.
Create sample csv file.
Read one or more csv files and create a temporary hive view called "tmp_csv_data".
Read one or more json files and create a temporary hive view called "tmp_json_data".
Let's take a quick look at the csv view.
Let's take a quick look at the json view. Please note, the malformed JSON file creates two blank records.
The following images shows the two datasets in a combined dataframe. Just use spark.write() to create the final file.
This code is straight forward and leverages the ANSI SQL skills that most people already have.