Каков наилучший способ реализации таблицы очереди сообщений в mysql

StackOverflow https://stackoverflow.com/questions/423111

  •  05-07-2019
  •  | 
  •  

Вопрос

Наверное, это уже десятый раз, когда я внедряю что-то подобное, и я никогда не был на 100% доволен решениями, которые я придумал.

Причина, по которой использование таблицы mysql вместо "правильной" системы обмена сообщениями привлекательно, заключается прежде всего в том, что большинство приложений уже используют некоторую реляционную базу данных для других целей (которая, как правило, является mysql для большинства вещей, которые я делал), в то время как очень немногие приложения используют систему обмена сообщениями.Кроме того, реляционные базы данных обладают очень сильными свойствами ACID, в то время как системы обмена сообщениями часто этого не делают.

Первая идея состоит в том, чтобы использовать:

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  process_id varbinary(255) null default null,
  key jobs_key(process_id) 
);

И тогда очередь выглядит примерно так:

insert into jobs(message) values('blah blah');

И удаление из очереди выглядит примерно так:

begin;
select * from jobs where process_id is null order by id asc limit 1;
update jobs set process_id = ? where id = ?; -- whatever i just got
commit;
-- return (id, message) to application, cleanup after done

Таблица и очередь выглядят красиво, но удаление из очереди меня немного беспокоит.Насколько велика вероятность отката?Или быть заблокированным?Какие ключи я должен использовать, чтобы сделать его O (1)-ish?

Или есть какое-нибудь лучшее решение по сравнению с тем, что я делаю?

Это было полезно?

Решение

Я создал несколько систем очередей сообщений, и я не уверен, к какому типу сообщений вы обращаетесь, но в случае удаления из очереди (это слово?) я сделал то же самое, что и вы сделали. Ваш метод выглядит простым, чистым и солидным. Не то чтобы моя работа была лучшей, но она оказалась очень эффективной для большого мониторинга многих сайтов. (регистрация ошибок, массовые почтовые маркетинговые кампании, уведомления в социальных сетях)

Мой голос: не беспокойся!

Другие советы

Ваша очередь может быть более краткой. Вместо того чтобы полагаться на откат транзакции, вы можете сделать это в одном атомарном операторе без явной транзакции:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Затем вы можете получить задания с помощью (скобки [] означают необязательные, в зависимости от ваших данных):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];

Брайан Акер говорил о обработчике очереди некоторое время назад. Говорили и о синтаксисе SELECT таблицы FRET DELETE .

Если вы не беспокоитесь о пропускной способности, вы всегда можете использовать SELECT GET_LOCK () в качестве мьютекса. Например:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

И если вы хотите стать действительно модным, оберните его в хранимую процедуру.

Я бы предложил использовать Quartz.NET

У него есть провайдеры для SQL Server, Oracle, MySql, SQLite и Firebird.

Этот поток содержит информацию о дизайне, которая должна быть отображена.

Процитировать:

Вот что я успешно использовал в прошлом:

Схема таблицы MsgQueue

Идентификатор MsgId - НЕ РАВЕН НУЛЮ
MsgTypeCode varchar(20) - НЕ РАВЕН НУЛЮ
Исходный код varchar(20) -- процесс вставки сообщения -- обнуляемый
Символ состояния (1) - 'N'ew, если поставлен в очередь, 'A'(ctive), если обработка 'C' завершена, по умолчанию 'N' - НЕ NULL
createTime datetime -- по умолчанию GETDATE() -- НЕ NULL
Msg varchar(255) - обнуляемый

Типы ваших сообщений соответствуют вашим ожиданиям - сообщения, которые соответствуют контракту между процессом (процессами) вставки и процессом (процессами) чтения, структурированные с помощью XML или другого выбранного вами способа представления (например, JSON был бы удобен в некоторых случаях).

Затем процессы от 0 до n могут выполнять вставку, а процессы от 0 до n могут считывать и обрабатывать сообщения, Каждый процесс чтения обычно обрабатывает один тип сообщения.Для балансировки нагрузки может быть запущено несколько экземпляров процесса одного типа.

Считыватель извлекает одно сообщение и меняет состояние на "A", пока работает с ним.Когда это будет сделано, это изменит состояние на "C"omplete.Он может удалить сообщение или нет, в зависимости от того, хотите ли вы сохранить журнал аудита.Сообщения State = 'N' извлекаются в порядке MsgType / Timestamp, поэтому для MsgType + State + createTime есть индекс.

Вариации:
Состояние для "E"rror.
Столбец для кода процесса считывания.
Временные метки для переходов состояний.

Это обеспечило приятный, масштабируемый, видимый и простой механизм для выполнения ряда действий, подобных тому, что вы описываете.Если у вас есть базовое представление о базах данных, оно довольно надежное и расширяемое.Никогда не было проблем с откатами блокировок и т.д.из-за транзакций перехода атомарного состояния.

Вот решение, которое я использовал, работая без process_id текущего потока или блокируя таблицу.

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Получите результат в массиве $ row и выполните:

DELETE from jobs WHERE ID=$row['ID'];

Затем получите затронутые строки (mysql_affered_rows). Если есть затронутые строки, обработайте задание в массиве $ row. Если имеется 0 затронутых строк, это означает, что какой-то другой процесс уже обрабатывает выбранное задание. Повторяйте вышеупомянутые шаги, пока не будет строк.

Я протестировал это с таблицей 'jobs', имеющей 100 тыс. строк и порождающей 20 параллельных процессов, выполняющих вышеуказанное. Никаких условий гонки не произошло. Вы можете изменить вышеупомянутые запросы, чтобы обновить строку с флагом обработки и удалить строку после того, как вы фактически обработали ее:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

Излишне говорить, что для такой работы вы должны использовать правильную очередь сообщений (ActiveMQ, RabbitMQ и т. д.). Нам пришлось прибегнуть к этому решению, так как наш хост регулярно ломает вещи при обновлении программного обеспечения, поэтому чем меньше вещей ломать, тем лучше.

Вы можете иметь промежуточную таблицу для поддержания смещения для очереди.

create table scan(
  scan_id int primary key,
  offset_id int
);

У вас также может быть несколько сканирований, следовательно, одно смещение на сканирование. Инициализируйте offset_id = 0 в начале сканирования.

begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0)  asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;

Все, что вам нужно сделать, это просто сохранить последнее смещение. Это также сэкономит вам много места (process_id для каждой записи). Надеюсь, это звучит логично.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top