在Python中创建可中断的过程
-
15-10-2019 - |
题
我正在创建一个Python脚本,其中解析了大型(但简单)的CSV。
需要花费一些时间来处理。我希望能够中断CSV的解析,以便我可以在以后继续进行。
目前,我有一个较大的班级: (未完成)
编辑:
我有一些更改的代码。但是该系统将分析超过300万行。
def parseData(self)
reader = csv.reader(open(self.file))
for id, title, disc in reader:
print "%-5s %-50s %s" % (id, title, disc)
l = LegacyData()
l.old_id = int(id)
l.name = title
l.disc_number = disc
l.parsed = False
l.save()
这是旧代码。
def parseData(self):
#first line start
fields = self.data.next()
for row in self.data:
items = zip(fields, row)
item = {}
for (name, value) in items:
item[name] = value.strip()
self.save(item)
多谢你们。
解决方案
如果在Linux下,点击CTRL-Z并停止运行过程。键入“ FG”以将其带回并开始停止它的位置。
其他提示
您可以使用 signal
参加活动。这是解析器的模型 CTRL-C
在窗户上停止解析:
import signal, tme, sys
def onInterupt(signum, frame):
raise Interupted()
try:
#windows
signal.signal(signal.CTRL_C_EVENT, onInterupt)
except:
pass
class Interupted(Exception): pass
class InteruptableParser(object):
def __init__(self, previous_parsed_lines=0):
self.parsed_lines = previous_parsed_lines
def _parse(self, line):
# do stuff
time.sleep(1) #mock up
self.parsed_lines += 1
print 'parsed %d' % self.parsed_lines
def parse(self, filelike):
for line in filelike:
try:
self._parse(line)
except Interupted:
print 'caught interupt'
self.save()
print 'exiting ...'
sys.exit(0)
def save(self):
# do what you need to save state
# like write the parse_lines to a file maybe
pass
parser = InteruptableParser()
parser.parse([1,2,3])
尽管我目前在Linux上,但无法测试它。
我做的方式:
将实际的处理代码推出,在该类中,我会实现腌制协议(http://docs.python.org/library/pickle.html)(基本上,写入正确 __getstate__
和 __setstate__
功能)
此类将接受文件名,保留打开文件,将CSV读取器实例作为实例成员。这 __getstate__
方法将保存当前的文件位置,SetState将重新打开文件,将其转发到适当的位置并创建新的读取器。
我会在 __iter__
方法,在处理每条线后,这将归因于外部功能。
此外部功能将运行中断的“主循环”监视输入(插座,键盘,文件系统上的特定文件状态等) - 一切都很安静,它只会要求处理器的下一次迭代。如果发生中断,它将将处理器状态腌制到磁盘上的特定文件。
开始时,程序只需要检查A是否有保存的执行,如果是的话,请使用Pickle检索executor对象,然后恢复主循环。
这里有一些(未经测试的)代码 - IEA足够简单:
from cPickle import load, dump
import csv
import os, sys
SAVEFILE = "running.pkl"
STOPNOWFILE = "stop.now"
class Processor(object):
def __init__(self, filename):
self.file = open(filename, "rt")
self.reader = csv.reader(self.file)
def __iter__(self):
for line in self.reader():
# do stuff
yield None
def __getstate__(self):
return (self.file.name, self.file.tell())
def __setstate__(self, state):
self.file = open(state[0],"rt")
self.file.seek(state[1])
self.reader = csv.reader(self.File)
def check_for_interrupts():
# Use your imagination here!
# One simple thing would e to check for the existence of an specific file
# on disk.
# But you go all the way up to instantiate a tcp server and listen to
# interruptions on the network
if os.path.exists(STOPNOWFILE):
return True
return False
def main():
if os.path.exists(SAVEFILE):
with open(SAVEFILE) as savefile:
processor = load(savefile)
os.unlink(savefile)
else:
#Assumes the name of the .csv file to be passed on the command line
processor = Processor(sys.argv[1])
for line in processor:
if check_for_interrupts():
with open(SAVEFILE, "wb") as savefile:
dump(processor)
break
if __name__ == "__main__":
main()
我的完整代码
我遵循@jsbueno的建议,上面有一个标志 - 但是我将其保留在类中,而是将其保留在类中:
我创建一个类 - 当我打电话时,请要求任何输入,然后开始另一个过程来完成我的工作。当它的循环时 - 如果我要按键,则设置标志,仅在循环被要求进行下一个解析时才检查。因此,我不会杀死当前的行动。添加一个 process
从我调用的数据中为每个对象的数据库中的标记,这意味着我可以随时启动此操作并恢复我关闭的位置。
class MultithreadParsing(object):
process = None
process_flag = True
def f(self):
print "\nMultithreadParsing has started\n"
while self.process_flag:
''' get my object from database '''
legacy = LegacyData.objects.filter(parsed=False)[0:1]
if legacy:
print "Processing: %s %s" % (legacy[0].name, legacy[0].disc_number)
for l in legacy:
''' ... Do what I want it to do ...'''
sleep(1)
else:
self.process_flag = False
print "Nothing to parse"
def __init__(self):
self.process = Process(target=self.f)
self.process.start()
print self.process
a = raw_input("Press any key to stop \n")
print "\nKILL FLAG HAS BEEN SENT\n"
if a:
print "\nKILL\n"
self.process_flag = False
感谢您的所有帮助(尤其是您的@jsbueno) - 如果不是您的话,我将不会有这个班级的想法。