Apache-Spark Problem with extract data for .json files

42 views Asked by At

I have a script in Pyspark to read a folder with .json archives and put in the registers data collected using a TMDB's API, but an error is going on and I don't know how to resolve it.

My script is in Portuguese because I'm Brazilian.

The error:

Traceback (most recent call last):
File "/home/gwillye/Documentos/Lab Ex/dadosProdutora.py", line 41, in \<module\>
dados_json = dados_json.withColumn("produtora", col("produtora").cast(StringType()))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/gwillye/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 5170, in withColumn
File "/home/gwillye/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/home/gwillye/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 185, in deco
pyspark.errors.exceptions.captured.AnalysisException: \[UNRESOLVED_COLUMN.WITH_SUGGESTION\] A column or function parameter with name `produtora` cannot be resolved. Did you mean one of the following? \[`genero`, `id`, `profissao`, `notaMedia`, `numeroVotos`\].;
'Project \[\_corrupt_record#8, anoFalecimento#9, anoLancamento#10L, anoNascimento#11, anoTermino#12, genero#13, generoArtista#14, id#15, nomeArtista#16, notaMedia#17, numeroVotos#18L, personagem#19, profissao#20, tempoMinutos#21, tituloOriginal#22, tituloPincipal#23, titulosMaisConhecidos#24, cast('produtora as string) AS produtora#42\]
\+- Relation \[\_corrupt_record#8,anoFalecimento#9,anoLancamento#10L,anoNascimento#11,anoTermino#12,genero#13,generoArtista#14,id#15,nomeArtista#16,notaMedia#17,numeroVotos#18L,personagem#19,profissao#20,tempoMinutos#21,tituloOriginal#22,tituloPincipal#23,titulosMaisConhecidos#24\] json'''

My example about the .json files:

\[
{
"id": "tt0061797",
"tituloPincipal": "The Madcap Island",
"tituloOriginal": "Hyokkori hy\\u00f4tan-jima",
"anoLancamento": 1967.0,
"tempoMinutos": 61.0,
"genero": "Animation",
"notaMedia": 5.6,
"numeroVotos": 10,
"generoArtista": "actress",
"personagem": null,
"nomeArtista": "Chinatsu Nakayama",
"anoNascimento": 1948.0,
"anoFalecimento": null,
"profissao": "actress,soundtrack,music_department",
"titulosMaisConhecidos": "tt1734449,tt0204339,tt0202407,tt0081881",
"produtora": null
},
{
"id": "tt0061797",
"tituloPincipal": "The Madcap Island",
"tituloOriginal": "Hyokkori hy\\u00f4tan-jima",
"anoLancamento": 1967.0,
"tempoMinutos": 61.0,
"genero": "Animation",
"notaMedia": 5.6,
"numeroVotos": 10,
"generoArtista": "actor",
"personagem": null,
"nomeArtista": "Arihiro Fujimura",
"anoNascimento": 1934.0,
"anoFalecimento": 1982.0,
"profissao": "actor,soundtrack",
"titulosMaisConhecidos": "tt0060926,tt0102587,tt0298290,tt0062224",
"produtora": null
},

... \]

My script is:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import requests

def get_tmdb_data(movie_id):
    api_key = 'f905807b2900febaccfb008c16388168'
    url = f'https://api.themoviedb.org/3/movie/{movie_id}?api_key={api_key}&language=pt-BR'
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return None

def get_produtora_udf(movie_id):
    tmdb_data = get_tmdb_data(movie_id)
    if tmdb_data and 'production_companies' in tmdb_data:
        produtoras = tmdb_data['production_companies']
        if produtoras:
            for produtora in produtoras:
                if 'name' in produtora:
                    return produtora['name']
    return None

def processar_registro(row):
    movie_id = row['id']
    produtora_atual = row['produtora']
    if produtora_atual is None:
        nova_produtora = get_produtora_udf(movie_id)
        return row.asDict() | {'produtora': nova_produtora}
    else:
        return row

if __name__ == "__main__":
    spark = SparkSession.builder.appName("ExemploPyspark").getOrCreate()

    # Leitura dos arquivos JSON na pasta 'JSON/'
    dados_json = spark.read.json("JSON/")

    # Adiciona a coluna 'produtora' com valor null
    dados_json = dados_json.withColumn("produtora", col("produtora").cast(StringType()))

    # Aplica a função processar_registro para substituir 'null' nas colunas 'produtora'
    dados_processados = dados_json.rdd.map(processar_registro).toDF()

    # Exibe o DataFrame resultante
    dados_processados.show(truncate=False)

    # Encerra a sessão Spark
    spark.stop()
0

There are 0 answers