Google App Engine: how to parallelize downloads using TaskQueue or Async Urlfetch?
-
30-09-2019 - |
Question
My Gae application retrieves JSON data from a third party site; given an ID representing the item to download , the item's data on this site is organized in multiple pages so my code has to download chunks of data, page after page, until the data of the last available page is retrieved.
My simplified code looks like this:
class FetchData(webapp.RequestHandler):
def get(self):
...
data_list = []
page = 1
while True:
fetched_data= urlfetch.fetch('http://www.foo.com/getdata?id=xxx&result=JSON&page=%s' % page)
data_chunk = fetched_data["data"]
data_list = data_list + data_chunk
if len(data_list) == int(fetched_data["total_pages"]):
break
else:
page = page +1
...
doRender('dataview.htm',{'data_list':data_list} )
The data_list
results is an ordered list where the first item has data of page number 1 and the last item has data of the latest page; this data_list
, once retrieved, is rendered in a view.
This approach works 99% of times but sometimes, due to the 30 seconds limit imposed by Google App Engine, on items with many pages i get the dreaded DeadlineExceededError
.
I would like to know if using TaskQueue|Deferred|AsyncUrlfetch I could improve this algorithm parallelizing in some way the N urlfetch calls.
Solution
Use this: http://code.google.com/appengine/docs/python/urlfetch/asynchronousrequests.html
Which is simple like so:
def handle_result(rpc):
result = rpc.get_result()
# ... Do something with result...
# Use a helper function to define the scope of the callback.
def create_callback(rpc):
return lambda: handle_result(rpc)
rpcs = []
for url in urls:
rpc = urlfetch.create_rpc()
rpc.callback = create_callback(rpc)
urlfetch.make_fetch_call(rpc, url)
rpcs.append(rpc)
# ...
# Finish all RPCs, and let callbacks process the results.
for rpc in rpcs:
rpc.wait()
OTHER TIPS
I have resolved with this:
chunks_dict = {}
def handle_result(rpc, page):
result = rpc.get_result()
chunks_dict[page] = result["data"]
def create_callback(rpc, page):
return lambda: handle_result(rpc, page)
rpcs = []
while True:
rpc = urlfetch.create_rpc(deadline = 10)
rpc.callback = create_callback(rpc, page)
urlfetch.make_fetch_call(rpc, 'http://www.foo.com/getdata?id=xxx&result=JSON&page=%s' % page)
rpcs.append(rpc)
if page > total_pages:
break
else:
page = page +1
for rpc in rpcs:
rpc.wait()
page_keys = chunks_dict.keys()
page_keys.sort()
for key in page_keys:
data_list= data_list + chunks_dict[key]