I created a demo problem when testing out auto-scalling Dask Distributed implementation on Kubernetes and AWS and I I'm not sure I'm tackling the problem correctly.
My scenario is given a md5 hash of a string (representing a password) find the original string. I hit three main problems.
A) the parameter space is massive and trying to create a dask bag with 2.8211099e+12 members caused memory issues (hence the 'explode' function you'll see in the sample code below).
B) Clean exit on early find. I think using take(1, npartitions=-1)
will achieve this but I wasn't sure. Originally I raised an exception raise Exception("%s is your answer' % test_str)
which worked but felt "dirty"
C) Given this is long running and sometimes workers or AWS boxes die, how would it be best to store progress?
Example code:
import distributed
import math
import dask.bag as db
import hashlib
import dask
import os
if os.environ.get('SCHED_URL', False):
sched_url = os.environ['SCHED_URL']
client = distributed.Client(sched_url)
versions = client.get_versions(True)
dask.set_options(get=client.get)
difficulty = 'easy'
settings = {
'hard': (hashlib.md5('welcome1'.encode('utf-8')).hexdigest(),'abcdefghijklmnopqrstuvwxyz1234567890', 8),
'mid-hard': (hashlib.md5('032abgh'.encode('utf-8')).hexdigest(),'abcdefghijklmnop1234567890', 7),
'mid': (hashlib.md5('b08acd'.encode('utf-8')).hexdigest(),'0123456789abcdef', 6),
'easy': (hashlib.md5('0812'.encode('utf-8')).hexdigest(),'0123456789', 4)
}
hashed_pw, keyspace, max_guess_length = settings[difficulty]
def is_pw(guess):
return hashlib.md5(guess.encode('utf-8')).hexdigest() == hashed_pw
def guess(n):
guess = ''
size = len(keyspace)
while n>0 :
n -= 1
guess += keyspace[n % size];
n = math.floor(n / size);
return guess
def make_exploder(num_partitions, max_val):
"""Creates a function that maps a int to a range based on the number maximum value aimed for
and the number of partitions that are expected.
Used in this code used with map and flattent to take a short list
i.e 1->1e6 to a large one 1->1e20 in dask rather than on the host machine."""
steps = math.ceil(max_val / num_partitions)
def explode(partition):
return range(partition * steps, partition * steps + steps)
return explode
max_val = len(keyspace) ** max_guess_length # How many possiable password permutation
partitions = math.floor(max_val / 100)
partitions = partitions if partitions < 100000 else 100000 # split in to a maximum of 10000 partitions. Too many partitions caused issues, memory I think.
exploder = make_exploder(partitions, max_val) # Sort of the opposite of a reduce. make_exploder(10, 100)(3) => [30, 31, ..., 39]. Expands the problem back in to the full problem space.
print("max val: %s, partitions:%s" % (max_val, partitions))
search = db.from_sequence(range(partitions), npartitions=partitions).map(exploder).flatten().filter(lambda i: i <= max_val).map(guess).filter(is_pw)
search.take(1,npartitions=-1)
I find 'easy' works well locally, 'mid-hard' works well on our 6 to 8 * m4.2xlarge AWS cluster. But so far haven't got hard
to work.
This depends strongly on how you arrange your elements into a bag. If each element is in its own partition then yes, this will certainly kill everything. 1e12 partitions is very expensive. I recommend keeping the number of partitions in the thousands or tens of thousands.
If you want this then I recommend not using dask.bag, but instead using the concurrent.futures interface and in particular the as_completed iterator.
Dask should be resilient to this as long as you can guarantee that the scheduler survives. If you use the concurrent futures interface rather than dask bag then you can also track intermediate results on the client process.