Лучший способ использовать таблицу БД в качестве очереди заданий (также известной как пакетная очередь или очередь сообщений)
-
08-07-2019 - |
Вопрос
У меня есть таблица базы данных с примерно 50 тысячами строк, каждая строка представляет собой задание, которое необходимо выполнить.У меня есть программа, которая извлекает задание из БД, выполняет его и возвращает результат в БД.(эта система работает прямо сейчас)
Теперь я хочу разрешить более чем одной задаче обработки выполнять задания, но быть уверенным, что ни одна задача не выполняется дважды (из соображений производительности, а не из-за того, что это вызовет другие проблемы).Поскольку доступ осуществляется через spproce, моя текущая задача - заменить указанную spproce чем-то, что выглядит примерно так
update tbl set owner=connection_id() where avalable and owner is null limit 1;
select stuff from tbl where owner = connection_id();
КСТАТИ;Задачи работника могут привести к снижению связи между получением работы и отправкой результатов.Кроме того, я не ожидаю, что БД даже приблизится к тому, чтобы стать узким местом, если я не испорчу эту часть (~ 5 заданий в минуту).
Есть ли с этим проблемы?Есть лучший способ сделать это?
Примечание:тот «База данных как антипаттерн IPC» здесь уместно лишь слегка, потому что 1) я не использую IPC (нет процесса, генерирующего строки, все они уже существуют прямо сейчас) и 2) основная проблема, описанная для этого антишаблона, заключается в том, что он приводит к ненужной нагрузке на БД как процессы ждут сообщений (в моем случае, если сообщений нет, все может выключиться по мере выполнения)
Решение
Вот что я успешно использовал в прошлом:
Схема таблицы MsgQueue
MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable
Типы ваших сообщений соответствуют вашим ожиданиям — сообщения, соответствующие контракту между вставкой процесса(ов) и чтением процесса(ов), структурированные с помощью XML или другого выбранного вами представления (в некоторых случаях JSON будет удобен, например, для пример).
Тогда процессы от 0 до n могут вставлять, а процессы от 0 до n могут читать и обрабатывать сообщения. Каждый процесс чтения обычно обрабатывает один тип сообщения.Для балансировки нагрузки можно запускать несколько экземпляров одного типа процесса.
Читатель извлекает одно сообщение и меняет состояние на «Активное», пока работает над ним.Когда это будет сделано, оно изменит состояние на «Завершено».Он может удалить сообщение или нет, в зависимости от того, хотите ли вы сохранить контрольный журнал.Сообщения State = 'N' извлекаются в порядке MsgType/Timestamp, поэтому существует индекс MsgType + State + CreateTime.
Вариации:
Состояние ошибки «Ошибка».
Столбец для кода процесса Reader.
Временные метки для переходов между состояниями.
Это обеспечило хороший, масштабируемый, наглядный и простой механизм для выполнения ряда вещей, которые вы описываете.Если у вас есть базовое представление о базах данных, это довольно надежный и расширяемый инструмент.
Код из комментариев:
CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) )
AS
DECLARE @MsgId INT
BEGIN TRAN
SELECT TOP 1 @MsgId = MsgId
FROM MsgQueue
WHERE MessageType = @pMessageType AND State = 'N'
ORDER BY CreateTime
IF @MsgId IS NOT NULL
BEGIN
UPDATE MsgQueue
SET State = 'A'
WHERE MsgId = @MsgId
SELECT MsgId, Msg
FROM MsgQueue
WHERE MsgId = @MsgId
END
ELSE
BEGIN
SELECT MsgId = NULL, Msg = NULL
END
COMMIT TRAN
Другие советы
В качестве возможного изменения технологии вы можете рассмотреть возможность использования MSMQ или чего-то подобного.
Каждое из ваших заданий/потоков может запрашивать очередь сообщений, чтобы узнать, доступно ли новое задание.Поскольку при чтении сообщения оно удаляется из стека, вы можете быть уверены, что только одно задание/поток получит сообщение.
Конечно, это предполагает, что вы работаете с платформой Microsoft.
Лучший способ реализовать очередь заданий в системе реляционной базы данных — использовать ПРОПУСТИТЬ ЗАБЛОКИРОВАНО.
SKIP LOCKED — это опция получения блокировки, которая применяется как к блокировкам чтения/совместного использования (FOR SHARE), так и к блокировкам записи/монопольного доступа (FOR UPDATE) и широко поддерживается в настоящее время:
- Oracle 10g и более поздние версии
- PostgreSQL 9.5 и более поздние версии
- SQL Server 2005 и более поздние версии
- MySQL 8.0 и более поздние версии
Теперь представьте, что у нас есть следующее post
таблица, которая используется в качестве очереди заданий:
CREATE TABLE post (
id int8 NOT NULL,
body varchar(255),
status int4,
title varchar(255),
PRIMARY KEY (id)
)
А status
Столбец используется как перечисление со значениями ОЖИДАНИЕ (0), УТВЕРЖДЕНО (1) или СПАМ (2).
Если у нас есть несколько одновременных пользователей, пытающихся модерировать post
записи, нам нужен способ координировать их усилия, чтобы избежать того, чтобы два модератора просматривали одно и то же. post
ряд.
Итак, SKIP LOCKED – это именно то, что нам нужно.Если два одновременных пользователя, Алиса и Боб, выполняют следующие запросы SELECT, которые блокируют исключительно записи сообщений, а также добавляют опцию SKIP LOCKED:
[Alice]:
SELECT
p.id AS id1_0_,
p.body AS body2_0_,
p.status AS status3_0_,
p.title AS title4_0_
FROM
post p
WHERE
p.status = 0
ORDER BY
p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
[Bob]:
SELECT
p.id AS id1_0_,
p.body AS body2_0_,
p.status AS status3_0_,
p.title AS title4_0_
FROM
post p
WHERE
p.status = 0
ORDER BY
p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
Мы видим, что Алиса может выбрать первые две записи, а Боб выбирает следующие две записи.Без SKIP LOCKED запрос на получение блокировки Боба будет заблокирован до тех пор, пока Алиса не снимет блокировку с первых двух записей.
Более подробную информацию о SKIP LOCKED см. Эта статья.
Вместо того, чтобы иметь владельца = null, когда он не принадлежит, вам следует вместо этого установить для него поддельную никем запись.Поиск значения null не ограничивает индекс, вы можете получить сканирование таблицы.(это для Oracle, SQL-сервер может быть другим)
Вы пытаетесь реализовать антишаблон «База данных как IPC».Посмотрите его, чтобы понять, почему вам следует подумать о правильном перепроектировании вашего программного обеспечения.