Question

When querying BigQuery through the python api using:

service.jobs().getQueryResults

We're finding that the first attempt works fine - all expected results are included in the response. However, if the query is run a second time shortly after the first (roughly within 5 minutes) only a small subset of the results are returned (in powers of 2) nearly instantly, with no errors.

See our complete code at: https://github.com/sean-schaefer/pandas/blob/master/pandas/io/gbq.py

Any thoughts on what could cause this?

Was it helpful?

Solution

It looks like the issue is that we return different default numbers of rows for query() and getQueryResults(). So depending on whether your query finished quickly (and so you didn't have to use getQueryResults()) you'd either get more or less rows.

I've filed a bug and we should have a fix soon.

The workaround (and a good idea overall) is to set the maxResults for both the query and the getQueryResults calls. And if you're going to want a lot of rows, you might want to page through results using the returned page token.

Below is an example that reads one page of data from a completed query job. It will be included in the next release of bq.py:

class _JobTableReader(_TableReader):
  """A TableReader that reads from a completed job."""

  def __init__(self, local_apiclient, project_id, job_id):
    self.job_id = job_id
    self.project_id = project_id
    self._apiclient = local_apiclient

  def ReadSchemaAndRows(self, max_rows=None):
    """Read at most max_rows rows from a table and the schema.

    Args:
      max_rows: maximum number of rows to return.

    Raises:
      BigqueryInterfaceError: when bigquery returns something unexpected.

    Returns:
      A tuple where the first item is the list of fields and the
      second item a list of rows.
    """
    page_token = None
    rows = []
    schema = {}
    max_rows = max_rows if max_rows is not None else sys.maxint
    while len(rows) < max_rows:
      (more_rows, page_token, total_rows, current_schema) = self._ReadOnePage(
          max_rows=max_rows - len(rows),
          page_token=page_token)
      if not schema and current_schema:
        schema = current_schema.get('fields', {})

      max_rows = min(max_rows, total_rows)
      for row in more_rows:
        rows.append([entry.get('v', '') for entry in row.get('f', [])])
      if not page_token and len(rows) != max_rows:
          raise BigqueryInterfaceError(
            'PageToken missing for %r' % (self,))
      if not more_rows and len(rows) != max_rows:
        raise BigqueryInterfaceError(
            'Not enough rows returned by server for %r' % (self,))
    return (schema, rows)

  def _ReadOnePage(self, max_rows, page_token=None):
    data = self._apiclient.jobs().getQueryResults(
        maxResults=max_rows,
        pageToken=page_token,
        # Sets the timeout to 0 because we assume the table is already ready.
        timeoutMs=0,
        projectId=self.project_id,
        jobId=self.job_id).execute()
    if not data['jobComplete']:
      raise BigqueryError('Job %s is not done' % (self,))
    page_token = data.get('pageToken', None)
    total_rows = int(data['totalRows'])
    schema = data.get('schema', None)
    rows = data.get('rows', [])
    return (rows, page_token, total_rows, schema)
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top