Question

I'm trying to build a class that uses multiprocessing + requests to make several requests in parallel. I'm running into an issue where it just hangs and gives me a cryptic error message and I'm not sure way.

Below is my code, it basically just uses a Pool with a callback to put results into a list. I have the requirement that I need a "hard timeout" for each URL, i.e. if a URL is taking more than a few seconds to get its content downloaded I just want to skip it. So I use a Pool timeout and do a diff on URLs attempted vs. URL content returned, the ones that were attempted but not returned are assumed to have failed. Here is my code:

import time
import json
import requests
import sys
from urlparse import parse_qs
from urlparse import urlparse
from urlparse import urlunparse
from urllib import urlencode
from multiprocessing import Process, Pool, Queue, current_process
from multiprocessing.pool import ThreadPool
from multiprocessing import TimeoutError
import traceback
from sets import Set
from massweb.pnk_net.pnk_request import pnk_request_raw
from massweb.targets.fuzzy_target import FuzzyTarget
from massweb.payloads.payload import Payload

class MassRequest(object):

    def __init__(self, num_threads = 10, time_per_url = 10, request_timeout = 10, proxy_list = [{}]):

        self.num_threads = num_threads
        self.time_per_url = time_per_url
        self.request_timeout = request_timeout
        self.proxy_list = proxy_list

        self.results = []
        self.urls_finished = []
        self.urls_attempted = []

        self.targets_results = []
        self.targets_finished = []
        self.targets_attempted = []

    def add_to_finished(self, x):

        self.urls_finished.append(x[0])
        self.results.append(x)

    def add_to_finished_targets(self, x):

        self.targets_finished.append(x[0])
        self.targets_results.append(x)

    def get_urls(self, urls):

        timeout = float(self.time_per_url * len(urls))
        pool = Pool(processes = self.num_threads)
        proc_results = []

        for url in urls:
            self.urls_attempted.append(url)
            proc_result = pool.apply_async(func = pnk_request_raw, args = (url, self.request_timeout, self.proxy_list), callback = self.add_to_finished)
            proc_results.append(proc_result)

        for pr in proc_results:

            try:
                pr.get(timeout = timeout)

            except:
                pool.terminate()
                pool.join()

        pool.terminate()
        pool.join()
        list_diff = Set(self.urls_attempted).difference(Set(self.urls_finished))

        for url in list_diff:
            sys.stderr.write("URL %s got timeout" % url)
            self.results.append((url, "__PNK_GET_THREAD_TIMEOUT"))

if __name__ == "__main__":

    f = open("out_urls_to_fuzz_1mil")
    urls_to_request = []
    for line in f:
        url = line.strip()
        urls_to_request.append(url)

    mr = MassRequest()
    mr.get_urls(urls_to_request)

Here is the function being called by the threads:

def pnk_request_raw(url_or_target, req_timeout = 5, proxy_list = [{}]):

    if proxy_list[0]:
        proxy = get_random_proxy(proxy_list)
    else:
        proxy = {}

    try:
        if isinstance(url_or_target, str):

            sys.stderr.write("Requesting: %s with proxy %s\n" % (str(url_or_target), str(proxy)))
            r = requests.get(url_or_target, proxies = proxy, timeout = req_timeout)
            return (url_or_target, r.text)

        if isinstance(url_or_target, FuzzyTarget):

            sys.stderr.write("Requesting: %s with proxy %s\n" % (str(url_or_target), str(proxy)))
            r = requests.get(url_or_target.url, proxies = proxy, timeout = req_timeout)
            return (url_or_target, r.text)

    except:
        #use this to mark failure on exception
        traceback.print_exc()
        #edit: this is the line that was breaking it all
        sys.stderr.out("A request failed to URL %s\n" % url_or_target)
        return (url_or_target, "__PNK_REQ_FAILED")

This seems to work well for smaller sets of URLs, but here is the output:

Requesting: http://www.sportspix.co.za/ with proxy {}
Requesting: http://www.sportspool.co.za/ with proxy {}
Requesting: http://www.sportspredict.co.za/ with proxy {}
Requesting: http://www.sportspro.co.za/ with proxy {}
Requesting: http://www.sportsrun.co.za/ with proxy {}
Requesting: http://www.sportsstuff.co.za/ with proxy {}
Requesting: http://sportsstuff.co.za/2011-rugby-world-cup with proxy {}
Requesting: http://www.sportstar.co.za/4-stroke-racing with proxy {}
Requesting: http://www.sportstats.co.za/ with proxy {}
Requesting: http://www.sportsteam.co.za/ with proxy {}
Requesting: http://www.sportstec.co.za/ with proxy {}
Requesting: http://www.sportstours.co.za/ with proxy {}
Requesting: http://www.sportstrader.co.za/ with proxy {}
Requesting: http://www.sportstravel.co.za/ with proxy {}
Requesting: http://www.sportsturf.co.za/ with proxy {}
Requesting: http://reimo.sportsvans.co.za/ with proxy {}
Requesting: http://www.sportsvans.co.za/4x4andmoreWindhoek.html with proxy {}
Handled exception:Traceback (most recent call last):
  File "mass_request.py", line 87, in get_fuzzy_targets
    pr.get(timeout = timeout)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
    raise self._value
AttributeError: 'file' object has no attribute 'out'

On that last exception, the program hangs and I have to completely kill it. AFAIK I'm never trying to access a file object with the attribute "out". My question is... how to fix!? Am I doing something obviously wrong here? Why isn't there a clearer exception?

Was it helpful?

Solution

I think that sys.stderr.out("A request failed to URL %s\n" % url_or_target) should be sys.stderr.write("A request failed to URL %s\n" % url_or_target)

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top