Il modo migliore per utilizzare una tabella DB come coda di lavoro (a.k.a coda batch o coda messaggi)

StackOverflow https://stackoverflow.com/questions/297280

Domanda

Ho una tabella di database con ~ 50K righe, ogni riga rappresenta un lavoro che deve essere fatto. Ho un programma che estrae un lavoro dal DB, fa il lavoro e rimette il risultato nel db. (questo sistema è attualmente in esecuzione)

Ora voglio consentire a più attività di elaborazione di eseguire lavori, ma assicurati che nessuna attività venga eseguita due volte (poiché le prestazioni non riguardano che ciò causerà altri problemi). Poiché l'accesso avviene tramite un germoglio, la mia attuale idea è di sostituire detto germoglio con qualcosa che assomigli a questo

update tbl set owner=connection_id() where avalable and owner is null limit 1;
select stuff from tbl where owner = connection_id();

A proposito; le attività del lavoratore potrebbero far cadere la connessione tra ottenere un lavoro e inviare i risultati. Inoltre, non mi aspetto che il DB si avvicini nemmeno al collo di bottiglia a meno che non incasini quella parte (~ 5 lavori al minuto)

Ci sono problemi con questo? C'è un modo migliore per farlo?

Nota: il " Database come anti-pattern IPC " è solo leggermente proposto qui perché 1) I non sto eseguendo IPC (non vi è alcun processo che genera le righe, tutte già esistono in questo momento) e 2) il problema principale descritto per quell'anti-pattern è che provoca un carico non necessario sul DB mentre i processi attendono i messaggi (in nel mio caso, se non ci sono messaggi, tutto può spegnersi mentre tutto è fatto)

È stato utile?

Soluzione

Ecco cosa ho usato con successo in passato:

Schema tabella MsgQueue

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

I tuoi tipi di messaggi sono quelli che ti aspetteresti: messaggi conformi a un contratto tra l'inserimento dei processi e la lettura dei processi, strutturati con XML o l'altra tua scelta di rappresentazione (JSON sarebbe utile in alcuni casi, ad esempio).

Quindi è possibile inserire processi da 0 a n e processi da 0 a n in grado di leggere ed elaborare i messaggi. Ogni processo di lettura in genere gestisce un singolo tipo di messaggio. Più istanze di un tipo di processo possono essere in esecuzione per il bilanciamento del carico.

Il lettore estrae un messaggio e cambia lo stato in "A" mentre funziona su di esso. Al termine, cambia lo stato in "C" incompleto. Può eliminare il messaggio o meno a seconda che si desideri conservare la traccia di controllo. Messaggi di stato = 'N' sono estratti nell'ordine MsgType / Timestamp, quindi c'è un indice su MsgType + State + CreateTime.

Variazioni:
Indicare per "E" errore.
Colonna per codice processo Reader.
Timestamp per le transizioni di stato.

Ciò ha fornito un meccanismo piacevole, scalabile, visibile e semplice per fare una serie di cose come la stai descrivendo. Se hai una conoscenza di base dei database, è abbastanza sicuro ed estensibile.


Codice dai commenti:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN

Altri suggerimenti

Proprio come un possibile cambiamento tecnologico, potresti prendere in considerazione l'utilizzo di MSMQ o qualcosa di simile.

Ciascuno dei tuoi lavori / thread potrebbe interrogare la coda di messaggistica per vedere se era disponibile un nuovo lavoro. Poiché l'atto di leggere un messaggio lo rimuove dallo stack, si è certi che il messaggio verrà ricevuto da un solo processo / thread.

Naturalmente, questo presuppone che tu stia lavorando con una piattaforma Microsoft.

Il modo migliore per implementare una coda lavori in un sistema di database relazionale è utilizzare SALTA BLOCCATA .

SKIP LOCKED è un'opzione di acquisizione dei blocchi che si applica sia ai blocchi di lettura / condivisione (FOR SHARE) che di scrittura / esclusivo (FOR UPDATE) ed è ampiamente supportata al giorno d'oggi:

  • Oracle 10g e versioni successive
  • PostgreSQL 9.5 e versioni successive
  • SQL Server 2005 e versioni successive
  • MySQL 8.0 e versioni successive

Ora, considera che abbiamo la seguente tabella post che viene utilizzata come coda di lavoro:

CREATE TABLE post (
    id int8 NOT NULL,
    body varchar(255),
    status int4,
    title varchar(255),
    PRIMARY KEY (id)
)

La colonna status viene utilizzata come Enum, con i valori di PENDING (0), APPROVED (1) o SPAM (2).

Se abbiamo più utenti simultanei che provano a moderare i record post , abbiamo bisogno di un modo per coordinare i loro sforzi per evitare che due moderatori rivedano la stessa riga post .

Quindi, SKIP LOCKED è esattamente ciò di cui abbiamo bisogno. Se due utenti simultanei, Alice e Bob, eseguono le seguenti query SELECT che bloccano esclusivamente i record dei post aggiungendo anche l'opzione SKIP LOCKED:

    [Alice]:
    SELECT
        p.id AS id1_0_,

    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

[Bob]:                                                                                                                                                                                                              
SELECT
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

Possiamo vedere che Alice può selezionare le prime due voci mentre Bob seleziona i successivi 2 record. Senza SKIP LOCKED, la richiesta di acquisizione del blocco Bob verrebbe bloccata fino a quando Alice non rilascia il blocco sui primi 2 record

Per maggiori dettagli su SKIP LOCKED, consulta questo articolo .

Invece di avere il proprietario = null quando non è di proprietà, dovresti invece impostarlo su un record di nessuno falso. La ricerca di null non limita l'indice, potresti finire con una scansione della tabella. (questo è per Oracle, il server SQL potrebbe essere diverso)

Stai tentando di implementare de & Database; come IPC " antipattern. Cercalo per capire perché dovresti considerare di riprogettare correttamente il tuo software.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top