In my NDB Datastore I have more than 2 million records. I want to export these records grouped by created_at date into CSV files on Google Cloud Storage. I calculated that every file would then be about 1GB.

2014-03-18.csv, ~17000 records, ~1GB
2014-03-17.csv, ~17000 records, ~1GB
2014-03-18.csv, ~17000 records, ~1GB
...

My first approach (pseudo-code):

import cloudstorage as gcs
gcs_file = gcs.open(date + '.csv', 'w')
query = Item.query().filter(Item.created_at >= date).filter(Item.created_at < date+1day)
records = query.fetch_page(50, cursor)
for record in records:
   gcs_file.write(record)

But this (obviously?) leads into memory issues:

Error: Exceeded soft private memory limit with 622.16 MB after servicing 2 requests total

Should I use a MapReduce Pipeline instead or is there any way to make approach 1 work? If using MapReduce: Could I filter for created_at without iterating over all records in NDB?

有帮助吗?

解决方案 2

I finally figured it out. Since all data is in NDB datastore I wasn't really able to test everything locally, so I found logging.info("Memory Usage: %s", runtime.memory_usage().current()) extremely helpful. (Import with from google.appengine.api import runtime).

The problem is the "In-Context Cache": query results are written back to the in-context cache. More information. See an example to disable the In-Context Cache for an Entity Kind.

My calculation was slightly wrong though. A generated CVS file is about 300 MB big. It is generated/ saved to Google Cloud Storage within 5 minutes.

Memory consumption without gc.collect()

Peak memory consumption was about 480MB.

In comparison, with an added gc.collect() in the while True: loop (link) as suggested by @brian in the comment above, the memory consumption peak was about 260MB. But it took quite long, about 20 minutes.

enter image description here

其他提示

Considering the number of records, it seems obvious indeed that you get a memory error. The garbage collector is called by default when the request ends, which explains why the memory used is increasing like this.

In this kind of situation what I usually do is calling the garbage collector manually with gc.collect() after each page is fetched.

It would look something like this:

import cloudstorage as gcs
import gc

cursor = None
more = True
gcs_file = gcs.open(date + '.csv', 'w')
query = Item.query().filter(Item.created_at >= date).filter(Item.created_at < date+1day)

while more:
  records, cursor, more = query.fetch_page(50, cursor)
  gc.collect()
  for record in records:
    gcs_file.write(record)

gcs_file.close()

It has been working for me in many cases.

The in context cache might be part of your issue, but fetch_page in general is a leaky method. If you're doing repeated queries, wrap your work in @ndb.toplevel so queues are cleared in between queries and garbage collection can be more effective.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top