I'm maintaining a data pipeline of a youtube channel, and I try to insert the live streaming chat data into my bigquery table. I was successfully inserted into the same table before, but the error occur when I was using the same code (it's really weird). I use cloud function as my env, and the code is as follow:
from google.cloud import bigquery
from functions_framework import http
def insert_to_bigquery(table_id, data):
client = bigquery.Client()
dataset_id = 'test_youtube_data'
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# insert into table
errors = client.insert_rows_json(
table=table,
json_rows=data,
ignore_unknown_values=True,
skip_invalid_rows=True)
print(f'Insert response: {errors}')
if errors:
print(f'Encountered errors while inserting rows: {errors}')
else:
print(f'Successfully inserted {len(data)} rows.')
print(data[10])
def get_streams_from_channel(channel_id, limit=10): # get data
...
def insert_chat_messages_to_bigquery(stream_id, chat_messages):
rows_to_insert = []
for message in chat_messages:
data = {
"stream_id": stream_id,
"time_in_seconds": message['time_in_seconds'],
"action_type": message['action_type'],
"message": message['message'],
"emotes_id": message['emotes'][0]['id'] if 'emotes' in message and len(message['emotes']) > 0 else None,
"emotes_name": message['emotes'][0]['name'] if 'emotes' in message and len(message['emotes']) > 0 else None,
"emotes_is_custom_emoji": message['emotes'][0]['is_custom_emoji'] if 'emotes' in message and len(message['emotes']) > 0 else None,
"message_id": message['message_id'],
"timestamp": message['timestamp'],
"time_text": message['time_text'],
"author_name": message['author']['name'],
"author_images_url": message['author']['images'][0]['url'],
"author_images_id": message['author']['images'][0]['id'],
"author_id": message['author']['id'],
"message_type": message['message_type']
}
rows_to_insert.append(data)
insert_to_bigquery("temp_stream_data", rows_to_insert) # insert
@http
def store_new_channel_streams(request):
channel_id = 'UCjv4bfP_67WLuPheS-Z8Ekg'
limit = 3
new_streams = get_streams_from_channel(channel_id, limit)
# download the data
for stream_id in new_streams:
# url = f"https://www.youtube.com/watch?v={stream_id}"
try:
print(stream_id)
chat_messages = get_chat_messages(stream_id)
insert_chat_messages_to_bigquery(stream_id, chat_messages)
except VideoNotFound:
print(f"Video {stream_id} not found, skipping.")
continue
return f"Stored {len(new_streams)} new channel streams to BigQuery.", 200
and I got SSLError as follow:
SSLError: HTTPSConnectionPool(host='bigquery.googleapis.com', port=443): Max retries exceeded with url: /bigquery/v2/projects/triple-voyage-377203/datasets/test_youtube_data/tables/stream_data/insertAll?prettyPrint=false (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:2396)')))
Log screen shot: image
I also ran the same code in google colab, and I get the same error.
I'm pretty sure my data is suit with table schema in bigquery, and the example data is as follow:
{'stream_id': 'https://www.youtube.com/watch?v=rBi1_Ggs39U', 'time_in_seconds': -1837, 'action_type': 'add_chat_item', 'message': ':_MIZUKILazy::_MIZUKILazy::_MIZUKILazy:', 'emotes_id': 'UCjv4bfP_67WLuPheS-Z8Ekg/fd89YfbtJ8_j8wTi2Ki4Ag', 'emotes_name': ':_MIZUKILazy:', 'emotes_is_custom_emoji': True, 'message_id': 'ChwKGkNQdjB1c2FucElBREZiM0R3Z1FkQ0NzQzdn', 'timestamp': 1690097427605763, 'time_text': '-30:37', 'author_name': '倉鼠', 'author_images_url': 'https://yt4.ggpht.com/ytc/AOPolaSpe3eNbZY01DHEOdApwgJOwcIuhGj2VIouEvBaHA', 'author_images_id': 'source', 'author_id': 'UCbpvt7VyEpZwJNfwpf09aFw', 'message_type': 'text_message'}, {'stream_id':...
But I still can't see any data in bigquery table.
I really don't know what's wroung here, thanks for reading my question!