A obtenção de Objetos Duplicados em Produtor/Consumidor ConcurrentDictionary C#
-
20-12-2019 - |
Pergunta
Eu estou preso em um problema e estou querendo saber se eu tenho algo codificado incorretamente.O aplicativo controla a cada poucos segundos, e agarra a cada registro de uma tabela, cujo único propósito é o de indicar quais registros para agir.
Por favor, note deixei de fora o código de tratamento de erro para o espaço e legibilidade
//Producing Thread, this is triggered every 5 seconds... UGH, I hate timers
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (!ConcurrentDictionary.Contains(Record.Key))
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
Este código funciona muito bem, com o irritante fato de que ele pode/vai selecionar o mesmo gravar várias vezes até que disse registro(s) é/são processados.Por processada, cada registro selecionado está sendo escrito em seu próprio recém-criado, exclusivamente nome de arquivo.Em seguida, um procedimento armazenado é chamado de chave do registro para remover do banco de dados em que ponto dessa chave é removida do ConcurrentDictionary.
// Consuming Thread, located within another loop to allow
// the below code to continue to cycle until instructed
// to terminate
while (!ConcurrentDictionary.IsEmpty)
{
var Record = ConcurrentDictionary.Take(1).First();
WriteToNewFile(Record.Value);
RemoveFromDatabase(Record.Key);
ConcurrentDictionary.TryRemove(Record.Key);
}
Para um teste de taxa de transferência adicionei 20k+ registros na tabela e, em seguida, virou-se a aplicação solta.Fiquei muito surpreso quando notei 22k+ arquivos que continuou a aumentar o bem em 100k+ território.
O que estou fazendo de errado???Eu completamente errado o que o simultâneas dicionário é usado para?Fez eu me esquecer de um ponto-e-vírgula, em algum lugar?
Solução
Primeiro, eliminar a chamada para o Contém.TryAdd já procura duplicatas, e retorna false se o item já está presente.
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
O próximo problema que eu vejo é que eu não acho que ConcurrentDictionary.Tomar(1).Primeira (o) é uma boa maneira de obter um item de dicionário, pois não é atômica.Eu acho que você quer usar um BlockingCollection() em vez disso.Ele é projetado especificamente para a implementação de um produtor-consumidor padrão.
Por fim, eu acho que seus problemas não têm a ver com o Dicionário, mas com o banco de dados.O dicionário em si é thread-safe, mas o seu dictioanry não é atômica com o banco de dados.Assim, suponha que Um registro no banco de dados.GetRecordsFromDataBase() puxa ele e o adiciona ao dicionário.Em seguida, ele inicia o processamento de Um registro (suponho que isso é em outro thread).Então, que o primeiro loop chama novamente GetRecordsFromDataBase() e recebe o registro de Um novo.Simultaneamente, Um registro é processado e removido do banco de dados.Mas é tarde demais!GetRecordsFromDataBase() já peguei-o!Para que o ciclo inicial adiciona ao dicionário outra vez, depois de ter sido removido.
Eu acho que você pode precisar de tomar registros que estão a ser processados, e movê-los para outra tabela completamente.Dessa forma, eles não vão ter pego uma segunda vez.Fazer isso em C# nível, em vez de a nível de banco de dados, vai ser um problema.Ou isso, ou você não quer ser a adição de registros para a fila, enquanto o processamento de registos.
Outras dicas
O que estou fazendo de errado???
Foreach (adicionar) loop está a tentar adicionar qualquer registro não no banco de dados ao dicionário.
O tempo (remover) loop é a remoção de itens de banco de dados e, em seguida, o dicionário, também escrevê-las para o arquivo.
Esta lógica parece correto.Mas não é uma corrida:
GetRecordsFromDataBase(); // returns records 1 through 10.
mudar de contexto para remover loop.
WriteToNewFile(Record.Value); // write record 5
RemoveFromDatabase(Record.Key); // remove record 5 from db
ConcurrentDictionary.TryRemove(Record.Key); // remove record 5 from dictionary
voltar para adicionar loop
ConcurrentDictionary.TryAdd(Record.Key, Record.Value); // add record 5 even though it is not in the DB becuase it was part of the records returned by ConcurrentDictionary.TryAdd(Record.Key, Record.Value);;
Depois que o item for removido do loop foreach adiciona-lo novamente.É por isso que sua contagem de arquivo está se multiplicando.
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (!ConcurrentDictionary.Contains(Record.Key)) // this if is not required. try add will do.
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
Tente algo como isso:adicionar loop:
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (ConcurrentDictionary.TryAdd(Record.Key, false)) // only adds the record if it has not been processed.
{
ConcurrentQueue.Enque(record) // enqueue the record
}
}
Remover loop
var record;// you will need to specify the type
if (ConcurrentQueue.TryDequeue(record))
{
if (ConcurrentDictionary.TryUpdate(record.key,true,false)) // update the value from true to false
{
WriteToNewFile(Record.Value); // write record 5
RemoveFromDatabase(Record.Key); // remove record 5 from db
}
}
Isso vai deixar itens no dicionário para cada registro processado.Você pode removê-los a partir do dicionário, eventualmente, mas multithreading envolvendo um db pode ser complicado.