How can I break this multithreaded python script into “chunks”?
-
01-10-2019 - |
Question
I'm processing 100k domain names into a CSV based on results taken from Siteadvisor using urllib (not the best method, I know). However, my current script creates too many threads and Python runs into errors. Is there a way I can "chunk" this script to do X number of domains at a time (say, 10-20) to prevent these errors? Thanks in advance.
import threading
import urllib
class Resolver(threading.Thread):
def __init__(self, address, result_dict):
threading.Thread.__init__(self)
self.address = address
self.result_dict = result_dict
def run(self):
try:
content = urllib.urlopen("http://www.siteadvisor.com/sites/" + self.address).read(12000)
search1 = content.find("didn't find any significant problems.")
search2 = content.find('yellow')
search3 = content.find('web reputation analysis found potential security')
search4 = content.find("don't have the results yet.")
if search1 != -1:
result = "safe"
elif search2 != -1:
result = "caution"
elif search3 != -1:
result = "warning"
elif search4 != -1:
result = "unknown"
else:
result = ""
self.result_dict[self.address] = result
except:
pass
def main():
infile = open("domainslist", "r")
intext = infile.readlines()
threads = []
results = {}
for address in [address.strip() for address in intext if address.strip()]:
resolver_thread = Resolver(address, results)
threads.append(resolver_thread)
resolver_thread.start()
for thread in threads:
thread.join()
outfile = open('final.csv', 'w')
outfile.write("\n".join("%s,%s" % (address, ip) for address, ip in results.iteritems()))
outfile.close()
if __name__ == '__main__':
main()
Edit: new version, based on andyortlieb's suggestions.
import threading
import urllib
import time
class Resolver(threading.Thread):
def __init__(self, address, result_dict, threads):
threading.Thread.__init__(self)
self.address = address
self.result_dict = result_dict
self.threads = threads
def run(self):
try:
content = urllib.urlopen("http://www.siteadvisor.com/sites/" + self.address).read(12000)
search1 = content.find("didn't find any significant problems.")
search2 = content.find('yellow')
search3 = content.find('web reputation analysis found potential security')
search4 = content.find("don't have the results yet.")
if search1 != -1:
result = "safe"
elif search2 != -1:
result = "caution"
elif search3 != -1:
result = "warning"
elif search4 != -1:
result = "unknown"
else:
result = ""
self.result_dict[self.address] = result
outfile = open('final.csv', 'a')
outfile.write(self.address + "," + result + "\n")
outfile.close()
print self.address + result
threads.remove(self)
except:
pass
def main():
infile = open("domainslist", "r")
intext = infile.readlines()
threads = []
results = {}
for address in [address.strip() for address in intext if address.strip()]:
loop=True
while loop:
if len(threads) < 20:
resolver_thread = Resolver(address, results, threads)
threads.append(resolver_thread)
resolver_thread.start()
loop=False
else:
time.sleep(.25)
for thread in threads:
thread.join()
# removed so I can track the progress of the script
# outfile = open('final.csv', 'w')
# outfile.write("\n".join("%s,%s" % (address, ip) for address, ip in results.iteritems()))
# outfile.close()
if __name__ == '__main__':
main()
Solution
This might be kind of rigid, but you could pass threads to Resolver, so that when Resolver.run is completed, it can call threads.remove(self)
Then you can nest some conditions so that threads are only created if there is room for them, and if there isn't room, they wait until there is.
for address in [address.strip() for address in intext if address.strip()]:
loop=True
while loop:
if len(threads)<20:
resolver_thread = Resolver(address, results, threads)
threads.append(resolver_thread)
resolver_thread.start()
loop=False
else:
time.sleep(.25)
OTHER TIPS
Your existing code will work beautifully - just modify your __init__
method inside Resolver
to take in an additional list of addresses instead of one at a time, so instead of having one thread for each address, you have one thread for every 10 (for example). That way you won't overload the threading.
You'll obviously have to slightly modify run
as well so it loops through the array of addresses instead of the one self.address
.
I can work up a quick example if you'd like, but from the quality of your code I feel as though you'll be able to handle it quite easily.
Hope this helps!
EDIT Example below as requested. Note that you'll have to modify main to send your Resolver
instance lists of addresses instead of a single address - I couldn't handle this for you without knowing more about the format of your file and how the addresses are stored. Note - you could do the run
method with a helper function, but i thought this might be more understandable as an example
class Resolver(threading.Thread):
def __init__(self, addresses, result_dict):
threading.Thread.__init__(self)
self.addresses = addresses # Now takes in a list of multiple addresses
self.result_dict = result_dict
def run(self):
for address in self.addresses: # do your existing code for every address in the list
try:
content = urllib.urlopen("http://www.siteadvisor.com/sites/" + address).read(12000)
search1 = content.find("didn't find any significant problems.")
search2 = content.find('yellow')
search3 = content.find('web reputation analysis found potential security')
search4 = content.find("don't have the results yet.")
if search1 != -1:
result = "safe"
elif search2 != -1:
result = "caution"
elif search3 != -1:
result = "warning"
elif search4 != -1:
result = "unknown"
else:
result = ""
self.result_dict[address] = result
except:
pass