Domanda

Supponiamo che tu stia eseguendo Django su Linux e che tu abbia una vista e desideri che quella vista restituisca i dati da un sottoprocesso chiamato cmd che opera su un file creato dalla vista, ad esempio likeso:

 def call_subprocess(request):
     response = HttpResponse()

     with tempfile.NamedTemporaryFile("W") as f:
         f.write(request.GET['data']) # i.e. some data

     # cmd operates on fname and returns output
     p = subprocess.Popen(["cmd", f.name], 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE)

     out, err = p.communicate()

     response.write(p.out) # would be text/plain...
     return response

Ora supponiamo che cmd abbia un tempo di avvio molto lento, ma un tempo di funzionamento molto veloce e non abbia nativamente una modalità demone. Vorrei migliorare i tempi di risposta di questo punto di vista.

Vorrei che l'intero sistema funzionasse molto più velocemente avviando un numero di istanze di cmd in un pool di lavoratori, facendoli attendere l'input, e avere call_process chiedere a uno di quei processi del pool di lavoratori di gestire i dati.

In realtà sono 2 parti:

Parte 1. Una funzione che chiama cmd e cmd attende l'input. Questo potrebbe essere fatto con i tubi, ad esempio

def _run_subcmd():
    p = subprocess.Popen(["cmd", fname], 
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    out, err = p.communicate()
    # write 'out' to a tmp file
    o = open("out.txt", "W")
    o.write(out)
    o.close()
    p.close()
    exit()

def _run_cmd(data):
    f = tempfile.NamedTemporaryFile("W")
    pipe = os.mkfifo(f.name)

    if os.fork() == 0:
        _run_subcmd(fname)
    else:
        f.write(data)

    r = open("out.txt", "r")
    out = r.read()
    # read 'out' from a tmp file
    return out

def call_process(request):
    response = HttpResponse()

    out = _run_cmd(request.GET['data'])

    response.write(out) # would be text/plain...
    return response

Parte 2. Una serie di lavoratori in esecuzione in background in attesa di dati. ovvero vogliamo estendere quanto sopra in modo che il sottoprocesso sia già in esecuzione, ad es. quando viene inizializzata l'istanza di Django o viene chiamato questo call_process , viene creato un insieme di questi lavoratori

WORKER_COUNT = 6
WORKERS = []

class Worker(object):
    def __init__(index):
        self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
        os.mkfifo(self.tmp_file.name)
        self.p = subprocess.Popen(["cmd", self.tmp_file], 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.index = index

    def run(out_filename, data):
        WORKERS[self.index] = Null # qua-mutex??
        self.tmp_file.write(data)
        if (os.fork() == 0): # does the child have access to self.p??
            out, err = self.p.communicate()
            o = open(out_filename, "w")
            o.write(out)
            exit()

        self.p.close()
        self.o.close()
        self.tmp_file.close()
        WORKERS[self.index] = Worker(index) # replace this one
        return out_file

    @classmethod
    def get_worker() # get the next worker
    # ... static, incrementing index 

Dovrebbe esserci un'inizializzazione dei lavoratori da qualche parte, in questo modo:

def init_workers(): # create WORKERS_COUNT workers
    for i in xrange(0, WORKERS_COUNT):
        tmp_file = tempfile.NamedTemporaryFile()
        WORKERS.push(Worker(i))

Ora, quello che ho sopra diventa qualcosa di simile:

def _run_cmd(data):
     Worker.get_worker() # this needs to be atomic & lock worker at Worker.index

     fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd

     Worker.run(fifo.name, data)
     # please ignore the fact that everything will be
     # appended to out.txt ... these will be tmp files, too, but named elsewhere.

     out = fifo.read()
     # read 'out' from a tmp file
     return out


def call_process(request):
     response = HttpResponse()

     out = _run_cmd(request.GET['data'])

     response.write(out) # would be text/plain...
     return response

Ora, le domande:

  1. Funzionerà? (L'ho appena scritto dalla cima della mia testa in StackOverflow, quindi sono sicuro che ci sono problemi, ma concettualmente funzionerà)

  2. Quali sono i problemi da cercare?

  3. Ci sono alternative migliori a questo? per esempio. I thread potrebbero funzionare altrettanto bene (è Debian Lenny Linux)? Esistono librerie che gestiscono pool di processi paralleli come questo?

  4. Ci sono interazioni con Django di cui dovrei essere consapevole?

Grazie per aver letto! Spero che questo sia un problema interessante come me.

Brian

È stato utile?

Soluzione

Potrebbe sembrare che stia puntando questo prodotto in quanto questa è la seconda volta che ho risposto con una raccomandazione di questo.

Ma sembra che tu abbia bisogno di un servizio di Accodamento messaggi, in particolare una coda di messaggi distribuiti.

Ecco come funzionerà:

  1. La tua app Django richiede CMD
  2. CMD viene aggiunto a una coda
  3. CMD viene inviato a diverse opere
  4. Viene eseguito e i risultati vengono restituiti a monte

La maggior parte di questo codice esiste e non è necessario creare il proprio sistema.

Dai un'occhiata al sedano che è stato inizialmente costruito con Django.

http://www.celeryq.org/ http://robertpogorzelski.com/blog/2009/09 / 10 / RabbitMQ-sedano-e-django /

Altri suggerimenti

Issy ha già menzionato il sedano, ma poiché i commenti non funzionano bene con esempi di codice, risponderò invece come risposta.

Dovresti provare a usare il sedano in modo sincrono con l'archivio risultati AMQP. È possibile distribuire l'esecuzione effettiva a un altro processo o anche a un'altra macchina. Eseguire in modo sincrono nel sedano è facile, ad es .:

>>> from celery.task import Task
>>> from celery.registry import tasks

>>> class MyTask(Task):
...
...     def run(self, x, y):
...         return x * y 
>>> tasks.register(MyTask)

>>> async_result = MyTask.delay(2, 2)
>>> retval = async_result.get() # Now synchronous
>>> retval 4

Il negozio di risultati AMQP rende molto veloce la restituzione del risultato, ma è disponibile solo nell'attuale versione di sviluppo (per diventare codice-freeze 0.8.0)

Che ne dici di " daemonizing " la chiamata di sottoprocesso utilizzando python-daemon o il suo successore, grizzled .

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top