我正在创建一个Python脚本,其中解析了大型(但简单)的CSV。

需要花费一些时间来处理。我希望能够中断CSV的解析,以便我可以在以后继续进行。

目前,我有一个较大的班级: (未完成)

编辑:

我有一些更改的代码。但是该系统将分析超过300万行。

def parseData(self)
    reader = csv.reader(open(self.file))
    for id, title, disc in reader:
        print "%-5s %-50s %s" % (id, title, disc)
        l = LegacyData()
        l.old_id = int(id)
        l.name = title
        l.disc_number = disc
        l.parsed = False
        l.save()

这是旧代码。

def parseData(self):
        #first line start
        fields = self.data.next()
        for row in self.data:
            items = zip(fields, row)
            item = {}
            for (name, value) in items:
                item[name] = value.strip()
            self.save(item)

多谢你们。

有帮助吗?

解决方案

如果在Linux下,点击CTRL-Z并停止运行过程。键入“ FG”以将其带回并开始停止它的位置。

其他提示

您可以使用 signal 参加活动。这是解析器的模型 CTRL-C 在窗户上停止解析:

import signal, tme, sys

def onInterupt(signum, frame):
    raise Interupted()

try:
    #windows
    signal.signal(signal.CTRL_C_EVENT, onInterupt)
except:
    pass

class Interupted(Exception): pass
class InteruptableParser(object):

    def __init__(self, previous_parsed_lines=0):
        self.parsed_lines = previous_parsed_lines

    def _parse(self, line):
        # do stuff
        time.sleep(1) #mock up
        self.parsed_lines += 1
        print 'parsed %d' % self.parsed_lines

   def parse(self, filelike):
        for line in filelike:
            try:
                self._parse(line)
            except Interupted:
                print 'caught interupt'
                self.save()
                print 'exiting ...'
                sys.exit(0)

    def save(self):
        # do what you need to save state
        # like write the parse_lines to a file maybe
        pass

parser = InteruptableParser()
parser.parse([1,2,3])

尽管我目前在Linux上,但无法测试它。

我做的方式:

将实际的处理代码推出,在该类中,我会实现腌制协议(http://docs.python.org/library/pickle.html)(基本上,写入正确 __getstate____setstate__ 功能)

此类将接受文件名,保留打开文件,将CSV读取器实例作为实例成员。这 __getstate__ 方法将保存当前的文件位置,SetState将重新打开文件,将其转发到适当的位置并创建新的读取器。

我会在 __iter__ 方法,在处理每条线后,这将归因于外部功能。

此外部功能将运行中断的“主循环”监视输入(插座,键盘,文件系统上的特定文件状态等) - 一切都很安静,它只会要求处理器的下一次迭代。如果发生中断,它将将处理器状态腌制到磁盘上的特定文件。

开始时,程序只需要检查A是否有保存的执行,如果是的话,请使用Pickle检索executor对象,然后恢复主循环。

这里有一些(未经测试的)代码 - IEA足够简单:

from cPickle import load, dump
import csv
import os, sys

SAVEFILE = "running.pkl"
STOPNOWFILE = "stop.now"

class Processor(object):
    def __init__(self, filename):
        self.file = open(filename, "rt")
        self.reader = csv.reader(self.file)
    def __iter__(self):
        for line in self.reader():
            # do stuff
            yield None
    def __getstate__(self):
        return (self.file.name, self.file.tell())
    def __setstate__(self, state):
        self.file = open(state[0],"rt")
        self.file.seek(state[1])
        self.reader = csv.reader(self.File)

def check_for_interrupts():
    # Use your imagination here!  
    # One simple thing would e to check for the existence of an specific file
    # on disk.
    # But you go all the way up to instantiate a tcp server and listen to 
    # interruptions on the network
    if os.path.exists(STOPNOWFILE): 
        return True
    return False

def main():
    if os.path.exists(SAVEFILE):
        with open(SAVEFILE) as savefile:
            processor = load(savefile)
        os.unlink(savefile)
    else:
        #Assumes the name of the .csv file to be passed on the command line
        processor = Processor(sys.argv[1])
    for line in processor:
        if check_for_interrupts():
            with open(SAVEFILE, "wb") as savefile:
                dump(processor)
            break

if __name__ == "__main__":
    main()

我的完整代码

我遵循@jsbueno的建议,上面有一个标志 - 但是我将其保留在类中,而是将其保留在类中:

我创建一个类 - 当我打电话时,请要求任何输入,然后开始另一个过程来完成我的工作。当它的循环时 - 如果我要按键,则设置标志,仅在循环被要求进行下一个解析时才检查。因此,我不会杀死当前的行动。添加一个 process 从我调用的数据中为每个对象的数据库中的标记,这意味着我可以随时启动此操作并恢复我关闭的位置。

class MultithreadParsing(object):

    process = None
    process_flag = True

    def f(self):
        print "\nMultithreadParsing has started\n"
        while self.process_flag:
            ''' get my object from database '''
            legacy = LegacyData.objects.filter(parsed=False)[0:1]

            if legacy:
                print "Processing: %s %s" % (legacy[0].name, legacy[0].disc_number)
                for l in legacy:
                    ''' ... Do what I want it to do ...'''
                sleep(1)
            else:
                self.process_flag = False
                print "Nothing to parse"



    def __init__(self):
        self.process = Process(target=self.f)
        self.process.start()
        print self.process
        a = raw_input("Press any key to stop \n")
        print "\nKILL FLAG HAS BEEN SENT\n"

        if a:
            print "\nKILL\n"
            self.process_flag = False

感谢您的所有帮助(尤其是您的@jsbueno) - 如果不是您的话,我将不会有这个班级的想法。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top