Qual è il modo migliore di implementare una tabella delle code di messaggistica in mysql

StackOverflow https://stackoverflow.com/questions/423111

  •  05-07-2019
  •  | 
  •  

Domanda

Probabilmente è la decima volta che sto implementando qualcosa del genere, e non sono mai stato soddisfatto al 100% delle soluzioni che ho ideato.

Il motivo che utilizza la tabella mysql invece di una "corretta" il sistema di messaggistica è interessante soprattutto perché la maggior parte delle applicazioni utilizza già alcuni database relazionali per altre cose (che tende ad essere mysql per la maggior parte delle cose che ho fatto), mentre pochissime applicazioni usano un sistema di messaggistica. Inoltre, i database relazionali hanno proprietà ACID molto forti, mentre i sistemi di messaggistica spesso no.

La prima idea è di usare:

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  process_id varbinary(255) null default null,
  key jobs_key(process_id) 
);

E poi accodare assomiglia a questo:

insert into jobs(message) values('blah blah');

E dequeue si presenta così:

begin;
select * from jobs where process_id is null order by id asc limit 1;
update jobs set process_id = ? where id = ?; -- whatever i just got
commit;
-- return (id, message) to application, cleanup after done

Tavolo e acconciatura sono belli, ma il dequeue mi dà fastidio. Quanto è probabile il rollback? O per essere bloccato? Quali chiavi dovrei usare per renderlo O (1) -ish?

O c'è qualche soluzione migliore di quello che sto facendo?

È stato utile?

Soluzione

Ho creato alcuni sistemi di accodamento dei messaggi e non sono sicuro del tipo di messaggio a cui ti riferisci, ma nel caso del dequeuing (è una parola?) ho fatto la stessa cosa tu 'ho fatto. Il tuo metodo sembra semplice, pulito e solido. Non che il mio lavoro sia il migliore, ma si è dimostrato molto efficace per il monitoraggio di grandi dimensioni per molti siti. (registrazione degli errori, campagne di email marketing di massa, avvisi sui social network)

Il mio voto: nessuna preoccupazione!

Altri suggerimenti

Il tuo dequeue potrebbe essere più conciso. Invece di fare affidamento sul rollback della transazione, è possibile farlo in un'unica istruzione atomica senza una transazione esplicita:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Quindi puoi estrarre i lavori con (parentesi [] significa facoltativo, a seconda dei tuoi particolari):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];

Brian Aker ha parlato di un motore di coda qualche tempo fa. Si è parlato anche di una sintassi SELECT table FROM DELETE .

Se non sei preoccupato per il throughput, puoi sempre utilizzare SELEZIONA GET_LOCK () come mutex. Ad esempio:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

E se vuoi essere davvero fantasioso, avvolgilo in una procedura memorizzata.

Suggerirei di utilizzare Quartz.NET

Ha provider per SQL Server, Oracle, MySql, SQLite e Firebird.

Questa discussione contiene informazioni di progettazione che dovrebbero essere mappabili.

Per citare:

Ecco cosa ho usato con successo in passato:

Schema tabella MsgQueue

Identità MsgId - NON NULL
MsgTypeCode varchar (20) - NOT NULL
SourceCode varchar (20) - processo di inserimento del messaggio - NULLable
State char (1) - 'N'ew se in coda,' A '(ctive) se in elaborazione,' Completato, predefinito 'N' - NOT NULL
CreateTime datetime - default GETDATE () - NOT NULL
Msg varchar (255) - NULLable

I tuoi tipi di messaggio sono quelli che ti aspetteresti: messaggi conformi a un contratto tra l'inserimento dei processi e la lettura dei processi, strutturati con XML o l'altra tua scelta di rappresentazione (JSON sarebbe utile in alcuni casi, ad esempio).

Quindi è possibile inserire processi da 0 a n e processi da 0 a n in grado di leggere ed elaborare i messaggi. Ogni processo di lettura in genere gestisce un singolo tipo di messaggio. Più istanze di un tipo di processo possono essere in esecuzione per il bilanciamento del carico.

Il lettore estrae un messaggio e cambia lo stato in "A" mentre funziona su di esso. Al termine, cambia lo stato in "C" incompleto. Può eliminare il messaggio o meno a seconda che si desideri conservare la traccia di controllo. Messaggi di stato = 'N' sono estratti nell'ordine MsgType / Timestamp, quindi c'è un indice su MsgType + State + CreateTime.

Variazioni:
Indicare per "E" errore.
Colonna per codice processo Reader.
Timestamp per le transizioni di stato.

Ciò ha fornito un meccanismo piacevole, scalabile, visibile e semplice per fare una serie di cose come la stai descrivendo. Se hai una conoscenza di base dei database, è abbastanza sicuro ed estensibile. Non c'è mai stato un problema con i rollback dei blocchi ecc. A causa delle transazioni di transizione dello stato atomico

Ecco una soluzione che ho usato, lavorando senza il process_id del thread corrente o bloccando la tabella.

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Ottieni il risultato in un array $ row ed esegui:

DELETE from jobs WHERE ID=$row['ID'];

Quindi ottieni le righe interessate (mysql_affected_rows). Se sono presenti righe interessate, elaborare il lavoro nell'array $ row. Se ci sono 0 righe interessate, significa che qualche altro processo sta già elaborando il lavoro selezionato. Ripeti i passaggi precedenti fino a quando non ci sono righe.

Ho provato questo con una tabella 'jobs' con 100k righe e generando 20 processi simultanei che fanno quanto sopra. Non si sono verificate condizioni di gara. Puoi modificare le query precedenti per aggiornare una riga con un flag di elaborazione ed eliminare la riga dopo averla effettivamente elaborata:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

Inutile dire che è necessario utilizzare una coda di messaggi appropriata (ActiveMQ, RabbitMQ, ecc.) per questo tipo di lavoro. Abbiamo dovuto ricorrere a questa soluzione, poiché il nostro host rompe regolarmente le cose durante l'aggiornamento del software, quindi meno cose si rompono, meglio è.

È possibile disporre di una tabella intermedia per mantenere l'offset per la coda.

create table scan(
  scan_id int primary key,
  offset_id int
);

Potresti avere anche più scansioni in corso, quindi un offset per scansione. Inizializza offset_id = 0 all'inizio della scansione.

begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0)  asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;

Tutto quello che devi fare è solo per mantenere l'ultimo offset. Ciò consentirebbe anche di risparmiare spazio significativo (process_id per record). Spero che questo suoni logico.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top