Question

I am using Python-RQ to create a job, when you create a job you get back a job.id:

f311ae30-b623-4b38-9dcb-0edd0133a6e6

Then I use that id to check if the result is finished, which is great.

Then this result is cached for (500 seconds).

Now, here is where I am confused.

When another request comes in within that 500 second timeframe with the same set of inputs:

{'blah': u'123456', 'title': u' Some Title', 'variable': 123}

How do I get back the cached result of that job vs creating another job.

My issue is the job.id is some hash including timestamps and such, so I'm not sure how to look up the result in redis.

I have searched everywhere, but haven't seen this documented anywhere for the best way to utilize the cached results without creating a new job.

Was it helpful?

Solution

I came up with a solution that might be helpful to others.

Basically, create a digest of the inputs (to the RQ worker), so that we can have something to lookup when another identical request comes in, this will be the hash name. The key will be 'job_key' and the value will be the job.id that we need.

When another request comes in that is identical to one previously processed, we will now have a way to find and provide the results, without having to do the same job again.

Also, just as a fail safe, added some extra seconds to the job, so when the cached job.result is being requested by another function, it will still be there and not get removed by the garbage collector in between calls.

*Request, if someone has some insight if there is a better way to handle the hash -> job.id (key, value) on memory consumption, similar to this and this please let me know. (Those two links are referring to how you can use a magnitude less memory using a hash vs a regular string for storing a key/value in a certain way using some algorithm to have 100 key/value's per hash).

On to the good stuff:

# Seconds before cached records expire
cache_expire = 500

# Create hash of parameters, to use as a lookup for job.id (cache)
hash = hashlib.sha1()
for param in search:
    hash.update(str(search[param]))
url_hash = 'url:{0}'.format(hash.hexdigest())

# Check if we have a cached result, need old job_key
job_key = r.hget(url_hash, 'job_key')
if job_key:
    job_hash = 'rq:job:{0}'.format(job_key)
    ttl = r.ttl(job_hash)
    if ttl:
        # Add 30 more seconds of buffer room
        # to ensure job.result doesn't get deleted pre-maturely
        r.expire(job_hash, ttl+30)
        return jsonify(search_id=job_key)
    else:
        # Job result has already been deleted, clear lookup hash
        r.delete(url_hash)

# Create new job
job = q.enqueue_call(func=worker.search, args=(search,), result_ttl=cache_expire)
# Create job.id lookup using hash as key (for cache)
if r.hsetnx(url_hash, 'job_key', job.id):
    r.expire(url_hash, cache_expire)

return jsonify(search_id=job.id)
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top