TL;DR: I have solved decrypting large-ish files in-memory (~50GiB) using gpg.on_data = processor, but still stumped on very large files (+500 GiB).
We have large mydata.gz.gpg files in blob storage and are trying to decrypt them in-memory to avoid leaving sensitive data in any other storage locations. Thus we are streaming them from the blob storage, decrypting them with python-gnupg and streaming them back up.
We are running on a Databricks clusters (only using driver node for this), so we can spin up a fairly large VM to run these but we would like to minimize the need to do that, particularly for files in the +1TB range.
I've solved the issue of doubling memory during decryption by using the gpg.on_data callable which is extremely handy. A 64GB RAM driver is not at all unreasonable, so most of our issues are solved this way. It goes like this:
class Streamer_5000():
def __init__(self, blob_service_client, storage_account, container, blob, total_bytes):
self.blob_service_client = blob_service_client
self.storage_account = storage_account
self.container = container
self.blob = blob
self.bytes_uploaded = 0
self.total_bytes = total_bytes
self.byte_block = b''
def __call__(self, chunk):
current_bytes = len(chunk)
final = (current_bytes == 0)
# Perhaps do custom processing here
new_name = '.'.join(self.blob.split(".")[:-1])
# print(f"{current_bytes} bytes / chunk") reads it in in 1024 bytes (1 KiB)
self.bytes_uploaded += current_bytes
pf = (float(self.bytes_uploaded) / float(self.total_bytes)) * 100.0
rss_mib = psutil.Process().memory_info().rss/(1024**2) # calc total rss mem used on driver
mem_total_mib = psutil.virtual_memory().total/(1024**2) # calc total mem available on driver
self.byte_block += chunk # Append the 1 KiB chunk to the byte_block until 4 MiB, then write to stream
if len(self.byte_block) == (1024**2 * 4) or final:
print(f"! Beaming up [ {current_bytes:.2f}/{self.bytes_uploaded:.2f} ] bytes of your data stream to /{self.storage_account}/{self.container}/Decrypted/{new_name}, Scotty !\n{pf:.2f}% done |\n{rss_mib:.2f}/{mem_total_mib:.2f} MiBs [{rss_mib/mem_total_mib*100:.2f}%] driver RSS Memory Used")
container_client = blob_service_client.get_container_client(container=container)
container_client.upload_blob(name=f"Decrypted/{new_name}", data = self.byte_block, blob_type = "AppendBlob", overwrite=False)
self.byte_block = b'' # empty byte block for next 4MiB chunk
return False # Tell python-gnupg not to buffer the chunk
within myDecryptorClass():
...
def __decrypt_my_blobs(self):
processor = Streamer_5000(self.blob_service_client, self.storageAccount, container, blob, container, blob_size)
gpg = gnupg.GPG(verbose=False)
gpg.on_data = processor
input_stream = self.blob_client.download_blob(max_concurrency=12) # would appreciate advice on this concurrency bit here as well
decrypted_data = gpg.decrypt(input_stream.readall(), always_trust=True, passphrase=self.passphrase_secret)
print ('Decryption Successful? ', decrypted_data.ok)
print ('Decryption Status: ', decrypted_data.status)
This works great! However it seems that gnupg *must read in the entirety of the encrypted .gpg file before it can do decryption on it, even with using the .on_data buffer control to stream it back up immediately as it's decrypted. With the gpg.on_data = processor, this greatly reduces the overall load on memory and it seems to max out at whatever the size file is that is being decrypted. If this solves your problems, happy to help. However, files larger than reasonable sized RAM or unreasonable process times to have a huge node up and running prohibit this from being the final solution.
download_blob().chunks() method
I've tried chunking for massive files with the below method:
gpg = gnupg.GPG(verbose=False)
gpg.on_data = processor
for chunk in input_stream.chunks(): #instead of .readall()
# Process your data (anything can be done here - 'chunk' is a byte array)
print(f"\nChunk length:\n", len(chunk)) # 4 MiB is default
decrypted_data = self.gpg.decrypt(chunk, always_trust=True, passphrase=self.passphrase_secret)
print ('Decryption Successful? ', decrypted_data.ok)
print ('Decryption Status: ', decrypted_data.status)
print("Actual decrypted data:\n",decrypted_data)
With this solution, the .chunks() reads in 4 MiB (by default) of data at a time, passes it to the .decypt() method and .on_data seems to decrypt the 4 MiB chunk in 1 KiB chunks (might be misunderstanding this). Again, when the byte array hits 4 MiB I upload it, to avoid a lot of time uploading. This method works for files that are smaller than the chunk() size (i.e. 1 single for-loop -- like 3.17 MiB), but it seems anything over it breaks because it seems PGP has to have at least read in the entirety of the encrypted file, even if it's decrypting in 1024 byte chunks, unless it fails to decrypt any of the chunks.
So my question is it seems that I can decrypt chunks of the file but something in the full byte array is providing the cypher with information it needs to decrypt each chunk. Can I calculate a byte offset and increment over the array and still successfully decrypt a massive file but while doing so in small chunks?
I read this article and believe it's possible, but I am a bit out of my league on decryption to understand what needs to be done: https://crypto.stackexchange.com/questions/52085/is-it-possible-to-decrypt-only-a-portion-of-a-gpg-encrypted-file
Possible solutions I've thought of:
Implement the codecs module
Change readall() which is blocking to some other form of stream like readint(stream) that isn't blocking, but so far none of these have worked.