Dataportablity api stops working after two weeks

20 views Asked by At

I am trying to use the Data Portability api to access user data, and process it on a nightly basis.

I have authorized the user and downloaded the file. This works great for about two weeks then it stops working and I have to request access of the user again.

How do I get a refresh token that will not expire and allow me to get this export nightly.

from __future__ import print_function

import os
from googleapiclient.errors import HttpError
import os.path
from collections.abc import Sequence
from google.auth.transport.requests import Request
from google.oauth2 import credentials
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient import discovery

from typing import Generator

# A list of Data Portability resources that we want to request.
RESOURCES = ['myactivity.youtube']

API_SERVICE_NAME = 'dataportability'
API_VERSION = 'v1beta'

# There is a one to one mapping between Data Portability resources and
# dataportability OAuth scopes. The scope code is the resource name plus a
# prefix.
SCOPE_PREFIX = 'https://www.googleapis.com/auth/dataportability.'

# After first authorization a user the access token and refresh token will be stored here.
USER_TOKENS = 'token.json'

# When modifying scopes remember to delete the file token.json.
# SCOPES = ['https://www.googleapis.com/auth/dataportability.myactivity.youtube']

# Installed application credentials from Google cloud console.
APPLICATION_CREDENTIALS = 'C:\Development\FreeLance\GoogleSamples\Credentials\\alpha.json';

def get_credentials(
        resources: Sequence[str],
) -> (credentials.Credentials, Sequence[str]):
    """Gets OAuth 2.0 credentials using an installed app OAuth flow.

     This generates a link for the user to consent to some or all of the requested
     resources. In a production environment, the best practice is to save a refresh
     token in Cloud Storage because the access token can expire before the
     portability archive job completes.

     Args:
       resources: A list of data portability resource IDs. These are OAuth scope
       codes from
         https://developers.devsite.corp.google.com/data-portability/reference/rest/v1alpha/portabilityArchive/initiate#authorization-scopes
           without the 'https://www.googleapis.com/auth/dataportability.' prefix.

     Returns:
       A tuple of credentials containing an access token and a list of resources
       for which the user has granted consent.
     """
    creds = None
    scope = [SCOPE_PREFIX + r for r in resources]

    if os.path.exists(USER_TOKENS):
        creds = Credentials.from_authorized_user_file(USER_TOKENS, scope)

    # If there are no (valid) user credentials available, prompt the user to log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                APPLICATION_CREDENTIALS, scope)
            creds = flow.run_local_server(port=0)
        # Save the credentials for the next run
        with open(USER_TOKENS, 'w') as token:
            token.write(creds.to_json())
    try:
        return creds, resources
    except Warning as warn:
        # We should gracefully handle the user only consenting to a subset of the
        # requested scopes.
        return credentials.Credentials(warn.token['access_token']), [
            scope.removeprefix(SCOPE_PREFIX) for scope in warn.new_scope
        ]

def get_api_interface(creds: credentials.Credentials,) -> discovery.Resource:
    """Gets an interface to the Data Portability API."""
    return build(API_SERVICE_NAME, API_VERSION, credentials=creds)

def initiate_portability_archive(
        dataportability: discovery.Resource, resources: Sequence[str]
) -> str:
    """Initiates a portability archive for the requested resources."""
    initiate = dataportability.portabilityArchive().initiate(
        body={'resources': [resources]}
    )
    print('\n', initiate.method, initiate.body, initiate.uri, '\n')
    initiate_response = initiate.execute()
    print(initiate_response, '\n')
    return initiate_response['archiveJobId']


def exponential_backoff(
        delay: float, max_delay: float, multiplier: float
) -> Generator[None, None, None]:
    while True:
        time.sleep(delay)
        yield
        delay = min(delay * multiplier, max_delay)


def poll_get_portability_archive_state(data_portability: discovery.Resource, job_id: str
) -> Sequence[str]:
    """Calls data_portability's getPortabilityArchiveState endpoint."""
    get_state = data_portability.archiveJobs().getPortabilityArchiveState(
        name='archiveJobs/{}/portabilityArchiveState'.format(job_id)
    )
    print(
        'Polling archive status while server indicates state is in progress...\n',
        get_state.method,
        get_state.uri,
    )
    for _ in exponential_backoff(3, 3600, 1.5):
        state = get_state.execute()
        print(state)
        if state['state'] != 'IN_PROGRESS':
            return state['urls']


def reset_authorization(data_portability: discovery.Resource) -> None:
    """Calls data portability's reset endpoint."""
    reset = data_portability.authorization().reset()
    print('\n', reset.method, reset.uri, '\n')
    initiate_response = reset.execute()
    print(initiate_response, '\n')


def main() -> None:
    # When running locally, disable OAuthlib's HTTPs verification. When
    # running in production *do not* leave this option enabled.
    os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
    creds, resources = get_credentials(RESOURCES)

    print('\nObtained OAuth credentials for resources: ', ', '.join(resources))
    data_portability = get_api_interface(creds)

    try:
        job_id = initiate_portability_archive(data_portability, resources)
        print('Successfully initiated data archive job with ID', job_id, '\n')
        urls = poll_get_portability_archive_state(data_portability, job_id)
        for url in urls:
            print('\nData archive is ready. Beginning download.')
            ufile = urllib.request.urlopen(url)
            print('Download complete! Extracting archive...\n')
            zf = zipfile.ZipFile(io.BytesIO(ufile.read()), 'r')
            for f in zf.filelist:
                print(f)
            # Save extracted files in the current directory.
            zf.extractall()
    except googleapiclient.errors.HttpError as e:
        print(e)
    finally:
        # If data retrieval fails, call reset in case any resources are exhausted.
        reset_authorization(data_portability)


if __name__ == '__main__':
    main()

Yes my project is in production so my refresh tokens last longer then a week. it seems to stop after two weeks.

0

There are 0 answers

Related Questions in DATA-PORTABILITY-API