Вопрос

Этот вопрос связан с другими, которые я задавал здесь, в основном касающимися сортировки огромных наборов данных в памяти.

По сути, это то, чего я хочу / имею:

Запущен скрученный сервер XMLRPC.Этот сервер хранит в памяти несколько (32) экземпляров класса Foo.Каждый класс Foo содержит строку списка (которая будет содержать несколько миллионов записей).Существует служба, которая извлекает данные из базы данных и передает их на сервер XMLRPC.Данные в основном представляют собой словарь с ключами, соответствующими каждому экземпляру Foo, а значения представляют собой список словарей, например:

data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}

Затем каждому экземпляру Foo передается значение, соответствующее его ключу, и словари Foo.bar обновляются и сортируются.

class XMLRPCController(xmlrpc.XMLRPC):

    def __init__(self):
        ...
        self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
        ...

    def update(self, data):
        for k, v in data:
            threads.deferToThread(self.foos[k].processData, v)

    def getData(self, fookey):
        # return first 10 records of specified Foo.bar
        return self.foos[fookey].bar[0:10]

class Foo():

    def __init__(self):
        bar = []

    def processData(self, new_bar_data):
        for record in new_bar_data:
            # do processing, and add record, then sort
            # BUNCH OF PROCESSING CODE
            self.bar.sort(reverse=True)

Проблема в том, что когда функция update вызывается в XMLRPCController с большим количеством записей (скажем, 100K +), она перестает отвечать на мои вызовы getData до тех пор, пока все 32 экземпляра Foo не завершат метод process_data.Я думал, что deferToThread сработает, но, думаю, я неправильно понимаю, в чем проблема.

Любые предложения...Я открыт для использования чего-то другого, например Cherrypy, если он поддерживает это требуемое поведение.


Редактировать

@Троя:Вот как устроен реактор

reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()

Что касается GIL, было бы разумным вариантом изменить значение sys.setcheckinterval() на что-то меньшее, чтобы блокировка данных была снята, и их можно было прочитать?

Это было полезно?

Решение

Самый простой способ сделать приложение отзывчивым - разбить обработку, требующую больших затрат процессора, на более мелкие фрагменты, позволив twisted reactor работать между ними.Например, вызвав reactor.callLater(0, process_next_chunk), чтобы перейти к следующему фрагменту.Эффективно реализуйте совместную многозадачность самостоятельно.

Другим способом было бы использовать отдельные процессы для выполнения этой работы, тогда вы выиграете от использования нескольких ядер.Взгляните на Ампулу: https://launchpad.net/ampoule Он предоставляет API, аналогичный deferToThread.

Другие советы

Я не знаю, как долго работает ваш метод ProcessData и как вы настраиваете свой twisted reactor. По умолчанию, витой реактор имеет пул потоков от 0 до 10 потоков.Возможно, вы пытаетесь отложить до 32 длительных вычислений до 10 потоков.Это неоптимально.

Вам также нужно спросить, какую роль играет GIL в обновлении всех этих коллекций.

Редактировать:Прежде чем вносить какие-либо серьезные изменения в свою программу (например, вызывать sys.setcheckinterval()) вероятно, вам следует запустить его с помощью профилировщика или модуля трассировки python.Они должны рассказать вам, какие методы вы используете постоянно.Без нужной информации вы не сможете внести нужные изменения.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top