The following code can read and write back to s3 on the fly following the the discussion on here:
from smart_open import open
import os
bucket_dir = "s3://my-bucket/annotations/"
with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
with open(
os.path.join(bucket_dir, "out.tsv.gz"), "wb"
) as fout:
for line in fin:
l = [i.strip() for i in line.decode().split("\t")]
string = "\t".join(l) + "\n"
fout.write(string.encode())
The issue is that after a few thousands lines processed (a few minutes) I get a "connection reset by peer" error:
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))
What can I do? I tried to fout.flush()
after every fout.write(string.encode())
but it doesn't work well. Is there a better solution to approach to process a .tsv file with about 200 million lines?
I implemented some producer-consumer approach on top of
smart_open
. This mitigates theConnection broke
error, but doesn't resolve it completely in some cases.