Каков наилучший способ реализации таблицы очереди сообщений в mysql
Вопрос
Наверное, это уже десятый раз, когда я внедряю что-то подобное, и я никогда не был на 100% доволен решениями, которые я придумал.
Причина, по которой использование таблицы mysql вместо "правильной" системы обмена сообщениями привлекательно, заключается прежде всего в том, что большинство приложений уже используют некоторую реляционную базу данных для других целей (которая, как правило, является mysql для большинства вещей, которые я делал), в то время как очень немногие приложения используют систему обмена сообщениями.Кроме того, реляционные базы данных обладают очень сильными свойствами ACID, в то время как системы обмена сообщениями часто этого не делают.
Первая идея состоит в том, чтобы использовать:
create table jobs( id auto_increment not null primary key, message text not null, process_id varbinary(255) null default null, key jobs_key(process_id) );
И тогда очередь выглядит примерно так:
insert into jobs(message) values('blah blah');
И удаление из очереди выглядит примерно так:
begin; select * from jobs where process_id is null order by id asc limit 1; update jobs set process_id = ? where id = ?; -- whatever i just got commit; -- return (id, message) to application, cleanup after done
Таблица и очередь выглядят красиво, но удаление из очереди меня немного беспокоит.Насколько велика вероятность отката?Или быть заблокированным?Какие ключи я должен использовать, чтобы сделать его O (1)-ish?
Или есть какое-нибудь лучшее решение по сравнению с тем, что я делаю?
Решение
Я создал несколько систем очередей сообщений, и я не уверен, к какому типу сообщений вы обращаетесь, но в случае удаления из очереди (это слово?) я сделал то же самое, что и вы сделали. Ваш метод выглядит простым, чистым и солидным. Не то чтобы моя работа была лучшей, но она оказалась очень эффективной для большого мониторинга многих сайтов. (регистрация ошибок, массовые почтовые маркетинговые кампании, уведомления в социальных сетях)
Мой голос: не беспокойся!
Другие советы
Ваша очередь может быть более краткой. Вместо того чтобы полагаться на откат транзакции, вы можете сделать это в одном атомарном операторе без явной транзакции:
UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;
Затем вы можете получить задания с помощью (скобки [] означают необязательные, в зависимости от ваших данных):
SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];
Брайан Акер говорил о обработчике очереди некоторое время назад. Говорили и о синтаксисе SELECT таблицы FRET DELETE
.
Если вы не беспокоитесь о пропускной способности, вы всегда можете использовать SELECT GET_LOCK () в качестве мьютекса. Например:
SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');
И если вы хотите стать действительно модным, оберните его в хранимую процедуру.
Я бы предложил использовать Quartz.NET
У него есть провайдеры для SQL Server, Oracle, MySql, SQLite и Firebird.
Этот поток содержит информацию о дизайне, которая должна быть отображена.
Процитировать:
Вот что я успешно использовал в прошлом:
Схема таблицы MsgQueue
Идентификатор MsgId - НЕ РАВЕН НУЛЮ
MsgTypeCode varchar(20) - НЕ РАВЕН НУЛЮ
Исходный код varchar(20) -- процесс вставки сообщения -- обнуляемый
Символ состояния (1) - 'N'ew, если поставлен в очередь, 'A'(ctive), если обработка 'C' завершена, по умолчанию 'N' - НЕ NULL
createTime datetime -- по умолчанию GETDATE() -- НЕ NULL
Msg varchar(255) - обнуляемый
Типы ваших сообщений соответствуют вашим ожиданиям - сообщения, которые соответствуют контракту между процессом (процессами) вставки и процессом (процессами) чтения, структурированные с помощью XML или другого выбранного вами способа представления (например, JSON был бы удобен в некоторых случаях).
Затем процессы от 0 до n могут выполнять вставку, а процессы от 0 до n могут считывать и обрабатывать сообщения, Каждый процесс чтения обычно обрабатывает один тип сообщения.Для балансировки нагрузки может быть запущено несколько экземпляров процесса одного типа.
Считыватель извлекает одно сообщение и меняет состояние на "A", пока работает с ним.Когда это будет сделано, это изменит состояние на "C"omplete.Он может удалить сообщение или нет, в зависимости от того, хотите ли вы сохранить журнал аудита.Сообщения State = 'N' извлекаются в порядке MsgType / Timestamp, поэтому для MsgType + State + createTime есть индекс.
Вариации:
Состояние для "E"rror.
Столбец для кода процесса считывания.
Временные метки для переходов состояний.
Это обеспечило приятный, масштабируемый, видимый и простой механизм для выполнения ряда действий, подобных тому, что вы описываете.Если у вас есть базовое представление о базах данных, оно довольно надежное и расширяемое.Никогда не было проблем с откатами блокировок и т.д.из-за транзакций перехода атомарного состояния.
Вот решение, которое я использовал, работая без process_id текущего потока или блокируя таблицу. Р>
SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;
Получите результат в массиве $ row и выполните:
DELETE from jobs WHERE ID=$row['ID'];
Затем получите затронутые строки (mysql_affered_rows). Если есть затронутые строки, обработайте задание в массиве $ row. Если имеется 0 затронутых строк, это означает, что какой-то другой процесс уже обрабатывает выбранное задание. Повторяйте вышеупомянутые шаги, пока не будет строк.
Я протестировал это с таблицей 'jobs', имеющей 100 тыс. строк и порождающей 20 параллельных процессов, выполняющих вышеуказанное. Никаких условий гонки не произошло. Вы можете изменить вышеупомянутые запросы, чтобы обновить строку с флагом обработки и удалить строку после того, как вы фактически обработали ее:
while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}
Излишне говорить, что для такой работы вы должны использовать правильную очередь сообщений (ActiveMQ, RabbitMQ и т. д.). Нам пришлось прибегнуть к этому решению, так как наш хост регулярно ломает вещи при обновлении программного обеспечения, поэтому чем меньше вещей ломать, тем лучше. Р>
Вы можете иметь промежуточную таблицу для поддержания смещения для очереди. Р>
create table scan(
scan_id int primary key,
offset_id int
);
У вас также может быть несколько сканирований, следовательно, одно смещение на сканирование. Инициализируйте offset_id = 0 в начале сканирования.
begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0) asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;
Все, что вам нужно сделать, это просто сохранить последнее смещение. Это также сэкономит вам много места (process_id для каждой записи). Надеюсь, это звучит логично.