Question

I have built a pipeline on AppEngine that loads data from Cloud Storage to BigQuery. This works fine, ..until there is any error. How can I can loading exceptions by BigQuery from my AppEngine code?

The code in the pipeline looks like this:

#Run the job
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)
jobCollection = bigquery_service.jobs()
result = jobCollection.insert(projectId=PROJECT_ID,
                              body=build_job_data(table_name, cloud_storage_files))
#Get the status
while (not allDone and not runtime.is_shutting_down()):
    try:
        job = jobCollection.get(projectId=PROJECT_ID,
                               jobId=insertResponse).execute()
        #Do something with job.get('status')
except:
    exc_type, exc_value, exc_traceback = sys.exc_info()
    logging.error(traceback.format_exception(exc_type, exc_value, exc_traceback))
    time.sleep(30)

This gives me status error, or major connectivity errors, but what I am looking for is functional errors from BigQuery, like fields formats conversion errors, schema structure issues, or other issues BigQuery may have while trying to insert rows to tables.

If any "functional" error on BigQuery's side happens, this code will run successfully and complete normally, but no table will be written on BigQuery. Not easy to debug when this happens...

Was it helpful?

Solution

You can use the HTTP error code from the exception. BigQuery is a REST API, so the response codes that are returned match the description of HTTP error codes here.

Here is some code that handles retryable errors (connection, rate limit, etc), but re-raises when it is an error type that it doesn't expect.

  except HttpError, err:
    # If the error is a rate limit or connection error, wait and
    # try again.
    # 403: Forbidden: Both access denied and rate limits.
    # 408: Timeout
    # 500: Internal Service Error
    # 503: Service Unavailable
    if err.resp.status in [403, 408, 500, 503]:
      print '%s: Retryable error %s, waiting' % (
          self.thread_id, err.resp.status,)
      time.sleep(5)
    else: raise

If you want even better error handling, check out the BigqueryError class in the bq command line client (this used to be available on code.google.com, but with the recent switch to gCloud, it isn't any more. But if you have gcloud installed, the bq.py and bigquery_client.py files should be in the installation).

OTHER TIPS

The key here is this part of the pasted code:

except:
    exc_type, exc_value, exc_traceback = sys.exc_info()
    logging.error(traceback.format_exception(exc_type, exc_value, exc_traceback))
    time.sleep(30)

This "except" is catching every exception, logging it, and letting the process continue without any consideration for re-trying.

The question is, what would you like to do instead? At least the intention is there with the "#Do something" comment.

As a suggestion, consider App Engine's task queues to check the status, instead of a loop with a 30 second wait. When tasks get an exception, they are automatically retried - and you can tune that behavior.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top