I am trying to use the Data Portability api to access user data, and process it on a nightly basis.
I have authorized the user and downloaded the file. This works great for about two weeks then it stops working and I have to request access of the user again.
How do I get a refresh token that will not expire and allow me to get this export nightly.
from __future__ import print_function
import os
from googleapiclient.errors import HttpError
import os.path
from collections.abc import Sequence
from google.auth.transport.requests import Request
from google.oauth2 import credentials
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient import discovery
from typing import Generator
# A list of Data Portability resources that we want to request.
RESOURCES = ['myactivity.youtube']
API_SERVICE_NAME = 'dataportability'
API_VERSION = 'v1beta'
# There is a one to one mapping between Data Portability resources and
# dataportability OAuth scopes. The scope code is the resource name plus a
# prefix.
SCOPE_PREFIX = 'https://www.googleapis.com/auth/dataportability.'
# After first authorization a user the access token and refresh token will be stored here.
USER_TOKENS = 'token.json'
# When modifying scopes remember to delete the file token.json.
# SCOPES = ['https://www.googleapis.com/auth/dataportability.myactivity.youtube']
# Installed application credentials from Google cloud console.
APPLICATION_CREDENTIALS = 'C:\Development\FreeLance\GoogleSamples\Credentials\\alpha.json';
def get_credentials(
resources: Sequence[str],
) -> (credentials.Credentials, Sequence[str]):
"""Gets OAuth 2.0 credentials using an installed app OAuth flow.
This generates a link for the user to consent to some or all of the requested
resources. In a production environment, the best practice is to save a refresh
token in Cloud Storage because the access token can expire before the
portability archive job completes.
Args:
resources: A list of data portability resource IDs. These are OAuth scope
codes from
https://developers.devsite.corp.google.com/data-portability/reference/rest/v1alpha/portabilityArchive/initiate#authorization-scopes
without the 'https://www.googleapis.com/auth/dataportability.' prefix.
Returns:
A tuple of credentials containing an access token and a list of resources
for which the user has granted consent.
"""
creds = None
scope = [SCOPE_PREFIX + r for r in resources]
if os.path.exists(USER_TOKENS):
creds = Credentials.from_authorized_user_file(USER_TOKENS, scope)
# If there are no (valid) user credentials available, prompt the user to log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
APPLICATION_CREDENTIALS, scope)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(USER_TOKENS, 'w') as token:
token.write(creds.to_json())
try:
return creds, resources
except Warning as warn:
# We should gracefully handle the user only consenting to a subset of the
# requested scopes.
return credentials.Credentials(warn.token['access_token']), [
scope.removeprefix(SCOPE_PREFIX) for scope in warn.new_scope
]
def get_api_interface(creds: credentials.Credentials,) -> discovery.Resource:
"""Gets an interface to the Data Portability API."""
return build(API_SERVICE_NAME, API_VERSION, credentials=creds)
def initiate_portability_archive(
dataportability: discovery.Resource, resources: Sequence[str]
) -> str:
"""Initiates a portability archive for the requested resources."""
initiate = dataportability.portabilityArchive().initiate(
body={'resources': [resources]}
)
print('\n', initiate.method, initiate.body, initiate.uri, '\n')
initiate_response = initiate.execute()
print(initiate_response, '\n')
return initiate_response['archiveJobId']
def exponential_backoff(
delay: float, max_delay: float, multiplier: float
) -> Generator[None, None, None]:
while True:
time.sleep(delay)
yield
delay = min(delay * multiplier, max_delay)
def poll_get_portability_archive_state(data_portability: discovery.Resource, job_id: str
) -> Sequence[str]:
"""Calls data_portability's getPortabilityArchiveState endpoint."""
get_state = data_portability.archiveJobs().getPortabilityArchiveState(
name='archiveJobs/{}/portabilityArchiveState'.format(job_id)
)
print(
'Polling archive status while server indicates state is in progress...\n',
get_state.method,
get_state.uri,
)
for _ in exponential_backoff(3, 3600, 1.5):
state = get_state.execute()
print(state)
if state['state'] != 'IN_PROGRESS':
return state['urls']
def reset_authorization(data_portability: discovery.Resource) -> None:
"""Calls data portability's reset endpoint."""
reset = data_portability.authorization().reset()
print('\n', reset.method, reset.uri, '\n')
initiate_response = reset.execute()
print(initiate_response, '\n')
def main() -> None:
# When running locally, disable OAuthlib's HTTPs verification. When
# running in production *do not* leave this option enabled.
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
creds, resources = get_credentials(RESOURCES)
print('\nObtained OAuth credentials for resources: ', ', '.join(resources))
data_portability = get_api_interface(creds)
try:
job_id = initiate_portability_archive(data_portability, resources)
print('Successfully initiated data archive job with ID', job_id, '\n')
urls = poll_get_portability_archive_state(data_portability, job_id)
for url in urls:
print('\nData archive is ready. Beginning download.')
ufile = urllib.request.urlopen(url)
print('Download complete! Extracting archive...\n')
zf = zipfile.ZipFile(io.BytesIO(ufile.read()), 'r')
for f in zf.filelist:
print(f)
# Save extracted files in the current directory.
zf.extractall()
except googleapiclient.errors.HttpError as e:
print(e)
finally:
# If data retrieval fails, call reset in case any resources are exhausted.
reset_authorization(data_portability)
if __name__ == '__main__':
main()
Yes my project is in production so my refresh tokens last longer then a week. it seems to stop after two weeks.