how to write python concurrent future and retry correctly? Tableau API

315 views Asked by At

I am a newbie to concurrent. I am trying to refresh 3 workbooks in Tableau:

    workbook_dict = {"workbook1":"workbook_id1","workbook2":"workbook_id2","workbook3":"workbook_id3"}

    @retry(tries=3, delay=5, backoff=0.2)
    def refresh_workbooks(self, workbook_dict):
        for workbook_name, workbook_id in workbook_dict.items():
            workbook = self.server.workbooks.get_by_id(workbook_id)

            #refresh will fire up the refresh, and return job_id object
            job_id = self.server.workbooks.refresh(workbook)

            #wait_for job will check the status and raise exception if fails or timeout
            #https://tableau.github.io/server-client-python/docs/api-ref#jobswait_for_job

            self.server.jobs.wait_for_job(job_id,timeout=1000)

This base code totally works and each workbook takes 15mins, so totally 45min to complete; And if it fails on the second workbook, it will start over from scratch.

I want to use concurrent to speed up and check wait_for_job. And if any fails then refresh only that workbook, retry a few times before throwing errors. First question: The below code is my attempt to try concurrent but the printed data object returns None. I think the code failed to execute the wait_for job Why is that?


import concurrent.futures
import urllib.request
import tableauserverclient as TSC

def refresh_workbook(self, workbook_id):
    workbook = self.server.workbooks.get_by_id(workbook_id)
    job_id = self.server.workbooks.refresh(workbook)
    return job_id

def refresh_workbooks(self, workbook_dict):
    job_dict = {}
    try:
        for workbook_name, workbook_id in workbook_dict.items():
            workbook = self.server.workbooks.get_by_id(workbook_id)
            job_id = self.server.workbooks.refresh(workbook)
            job_dict[workbook_name] = job_id.id
            print(job_dict)
    except:
        raise

    return job_dict


def wait_workbook(self, job_id, timeout=None):
    self.server.jobs.wait_for_job(job_id, timeout=timeout)


test = TableauServerConnection()
workbook_dict = {"workbook1":"workbook_id1","workbook2":"workbook_id2","workbook3":"workbook_id3"}
jobs = test.refresh_workbooks(workbook_dict)


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    
    future_to_job = {executor.submit(test.wait_workbook, job_id, 1800): job_id for workbook_name, job_id in jobs.items()}
    for future in concurrent.futures.as_completed(future_to_job):
        job = future_to_job[future]
        try:
            data = future.result()
            #Why did I get None in data? 
            print('data',data)
        except Exception as exc:
            #can I spin up new future here and what is the correct syntax?

            print('%s generated an exception: %s' % (job, exc))
        else:
            print('job', job)

Secondly, if I add retry in exception, could I add new future object there?

1

There are 1 answers

0
jean-loup Monnier On

You migth want to try something like this, I'm using a while loop and getting the lastest result. Then for each finished task I can now add some new task. In your case only when a task fails.

But this allow you to to also not submit all the task at the begining. Depending on the number of task you want to run, it might be better to add X task at the begining (Where X > max_workers) and then each time a task finishes you add one more.

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

future_to_job = {executor.submit(test.wait_workbook, job_id, 1800): job_id for workbook_name, job_id in jobs.items()}
while future_to_job:
    done, _ = concurrent.futures.wait(
            future_to_job, timeout=0.25,
            return_when=concurrent.futures.FIRST_COMPLETED)

    for future in done:

        try:
            data = future.result()
            #Why did I get None in data? 
            print('data',data)
        except Exception as exc:
            #can I spin up new future here and what is the correct syntax?
            print('%s generated an exception: %s' % (job, exc))
            # get the jobs id 
            job_id = future_to_job[future]
            # clean up the old task
            del future_to_job[future]
            # add the wen one
            future_to_job[executor.submit(test.wait_workbook, job_id, 1800)] = job_id
        else:
            print('job', job)

For the None part, your wait_workbook function should return the result

def wait_workbook(self, job_id, timeout=None):
    return self.server.jobs.wait_for_job(job_id, timeout=timeout)

hope it helps :)