Question

Supposons que vous exécutiez Django sous Linux, que vous disposiez d'une vue et que vous souhaitiez que cette vue renvoie les données d'un sous-processus appelé cmd . sur un fichier créé par la vue, par exemple, likeso:

 def call_subprocess(request):
     response = HttpResponse()

     with tempfile.NamedTemporaryFile("W") as f:
         f.write(request.GET['data']) # i.e. some data

     # cmd operates on fname and returns output
     p = subprocess.Popen(["cmd", f.name], 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE)

     out, err = p.communicate()

     response.write(p.out) # would be text/plain...
     return response

Supposons maintenant que cmd ait un temps de démarrage très lent, mais un temps de fonctionnement très rapide et qu'il n'ait pas de mode démon en mode natif. J'aimerais améliorer le temps de réponse de cette vue.

Je souhaite que l'ensemble du système fonctionne beaucoup plus rapidement en démarrant un certain nombre d'instances de cmd dans un pool de travail, faites-les attendre en entrée, et demander à call_process de demander à l'un de ces processus de pool de travail de gérer les données.

C’est vraiment 2 parties:

Partie 1. Une fonction qui appelle cmd et cmd attend les entrées. Cela pourrait être fait avec des tuyaux, c'est-à-dire.

def _run_subcmd():
    p = subprocess.Popen(["cmd", fname], 
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    out, err = p.communicate()
    # write 'out' to a tmp file
    o = open("out.txt", "W")
    o.write(out)
    o.close()
    p.close()
    exit()

def _run_cmd(data):
    f = tempfile.NamedTemporaryFile("W")
    pipe = os.mkfifo(f.name)

    if os.fork() == 0:
        _run_subcmd(fname)
    else:
        f.write(data)

    r = open("out.txt", "r")
    out = r.read()
    # read 'out' from a tmp file
    return out

def call_process(request):
    response = HttpResponse()

    out = _run_cmd(request.GET['data'])

    response.write(out) # would be text/plain...
    return response

Partie 2. Ensemble de travailleurs en attente en attente d'exécution sur les données. Nous souhaitons étendre ce qui précède de sorte que le sous-processus soit déjà en cours d'exécution, par exemple. lorsque l'instance de Django est initialisée ou que le processus_appel est appelé en premier, un ensemble de ces travailleurs est créé

WORKER_COUNT = 6
WORKERS = []

class Worker(object):
    def __init__(index):
        self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
        os.mkfifo(self.tmp_file.name)
        self.p = subprocess.Popen(["cmd", self.tmp_file], 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.index = index

    def run(out_filename, data):
        WORKERS[self.index] = Null # qua-mutex??
        self.tmp_file.write(data)
        if (os.fork() == 0): # does the child have access to self.p??
            out, err = self.p.communicate()
            o = open(out_filename, "w")
            o.write(out)
            exit()

        self.p.close()
        self.o.close()
        self.tmp_file.close()
        WORKERS[self.index] = Worker(index) # replace this one
        return out_file

    @classmethod
    def get_worker() # get the next worker
    # ... static, incrementing index 

Il devrait y avoir une initialisation des travailleurs quelque part, comme ceci:

def init_workers(): # create WORKERS_COUNT workers
    for i in xrange(0, WORKERS_COUNT):
        tmp_file = tempfile.NamedTemporaryFile()
        WORKERS.push(Worker(i))

Maintenant, ce que j'ai ci-dessus devient quelque chose de pareil:

def _run_cmd(data):
     Worker.get_worker() # this needs to be atomic & lock worker at Worker.index

     fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd

     Worker.run(fifo.name, data)
     # please ignore the fact that everything will be
     # appended to out.txt ... these will be tmp files, too, but named elsewhere.

     out = fifo.read()
     # read 'out' from a tmp file
     return out


def call_process(request):
     response = HttpResponse()

     out = _run_cmd(request.GET['data'])

     response.write(out) # would be text/plain...
     return response

Maintenant, les questions:

  1. Cela fonctionnera-t-il? (Je viens juste de taper cela dans la tête de StackOverflow, alors je suis sûr qu'il y a des problèmes, mais sur le plan conceptuel, cela fonctionnera-t-il?)

  2. Quels sont les problèmes à rechercher?

  3. Existe-t-il de meilleures alternatives à cela? par exemple. Les threads pourraient-ils aussi bien fonctionner (c'est Debian Lenny Linux)? Existe-t-il des bibliothèques qui gèrent des pools de travailleurs de processus parallèles comme celui-ci?

  4. Existe-t-il des interactions avec Django dont je devrais être conscient?

Merci d'avoir lu! J'espère que vous trouvez ce problème aussi intéressant que moi.

Brian

Était-ce utile?

La solution

Il peut sembler que je porte ce produit car c’est la deuxième fois que je réponds avec une recommandation de ce type.

Mais il semble que vous ayez besoin d’un service Message Queing, en particulier d’une file de messages distribuée.

Voici comment cela fonctionnera:

  1. Votre application Django demande CMD
  2. CMD est ajouté à une file d'attente
  3. CMD est poussé vers plusieurs œuvres
  4. Il est exécuté et les résultats sont renvoyés en amont

La plupart de ce code existe et vous n'avez pas besoin de créer votre propre système.

Jetez un oeil sur le céleri qui a été initialement construit avec Django.

http://www.celeryq.org/ http://robertpogorzelski.com/blog/2009/09 / 10 / rabbitmq-céleri-et-django /

Autres conseils

Issy a déjà mentionné le céleri, mais comme les commentaires ne fonctionnent pas bien avec des exemples de code, je vous répondrai à la place.

Vous devriez essayer d’utiliser Celery de manière synchrone avec le magasin de résultats AMQP. Vous pouvez répartir l'exécution réelle vers un autre processus ou même un autre ordinateur. L'exécution synchrone dans le céleri est facile, par exemple:

>>> from celery.task import Task
>>> from celery.registry import tasks

>>> class MyTask(Task):
...
...     def run(self, x, y):
...         return x * y 
>>> tasks.register(MyTask)

>>> async_result = MyTask.delay(2, 2)
>>> retval = async_result.get() # Now synchronous
>>> retval 4

Le magasin de résultats AMQP permet de renvoyer le résultat très rapidement. mais il n’est disponible que dans la version de développement actuelle (en gel de code pour devenir 0.8.0)

Que diriez-vous de "démoniser &"; l'appel du sous-processus à l'aide de python-daemon ou de son successeur, grizzled .

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top