JMS è la risposta alla necessità di una coda di blocco persistente?
-
10-10-2019 - |
Domanda
Sto creando una libreria che è costituito da un appender Log4J che invia in modo asincrono gli eventi a un server remoto. Quando un'istruzione di registro è fatto, l'appender sarà modo asincrono registrare l'evento in una coda locale che un pool di consumatori sarà quindi recuperare e inviare al telecomando.
La soluzione completamente in-memory sarebbe quella di creare un BlockingQueue che gestire il problema della concorrenza. Tuttavia, mi piacerebbe che la coda per essere mantenuta in modo che se il server remoto non è disponibile io non crescono la coda illimitata o iniziare a messaggi di scarto nel caso di una coda limitata.
Stavo pensando di utilizzare un database H2 incorporato per memorizzare gli eventi a livello locale e quindi utilizzare un meccanismo di polling per recuperare gli eventi e inviare al telecomando. Avrei preferito utilizzare un BlockingQueue rispetto per interrogare una tabella di database.
Is JMS la risposta?
EDIT:
Se JMS è la risposta, e sembra essere in corso in quel modo, qualcuno ha suggerimenti su un peso leggero, soluzione di JMS integrabile che può essere configurato per accettare solo i messaggi in-process? In altre parole, io non voglio, e forse non sarà consentito di, aprire un socket TCP su cui ascoltare.
EDIT:
devo ActiveMQ incorporato ora e sembra funzionare. Grazie a tutti.
Soluzione
Si potrebbe utilizzare JMS ai messaggi in modo asincrono inviare a un computer remoto (ammesso che li può ricevere, naturalmente), Log4j ha un Appender JMS è possibile utilizzare per questo.
Altri suggerimenti
Bob Lee open source molto semplice disco sostenuto coda un po 'indietro, https://github.com/square/retrofit/blob/master/modules/android/src/retrofit/io/QueueFile.java - può essere utile, ed è certamente molto più facile per introdurre di JMS se si può accettare durata locale.
Questa classe è standalone -. Esso può essere copiato e incollato
Si può sicuramente utilizzare JMS per questo scopo. Per quanto ho capito si utilizza l'appender Log4J JMS. Questo componente invia messaggi a preconfigurati destinazione JMS (in genere di coda). È possibile configurare questa coda per essere mantenuta. In questo caso tutti i messaggi inseriti nella coda saranno memorizzati automaticamente in qualche negozio persistente (tipicamente database.). Purtroppo questa configurazione è specifico fornitore (dipende dal fornitore JMS), ma di solito è molto semplice. Si prega di fare riferimento alla documentazione di voi JMS provider.
vedere se questo funziona
Questo codice dovrebbe funzionare per voi - la sua una memoria coda di blocco persistente - ha bisogno di qualche messa a punto del file, ma dovrebbe funzionare
package test;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class BlockingQueue {
//private static Long maxInMenorySize = 1L;
private static Long minFlushSize = 3L;
private static String baseDirectory = "/test/code/cache/";
private static String fileNameFormat = "Table-";
private static String currentWriteFile = "";
private static List<Object> currentQueue = new LinkedList<Object>();
private static List<Object> lastQueue = new LinkedList<Object>();
static{
try {
load();
} catch (IOException e) {
System.out.println("Unable To Load");
e.printStackTrace();
}
}
private static void load() throws IOException{
File baseLocation = new File(baseDirectory);
List<String> fileList = new ArrayList<String>();
for(File entry : baseLocation.listFiles()){
if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
fileList.add(entry.getAbsolutePath());
}
}
Collections.sort(fileList);
if(fileList.size()==0){
//currentQueue = lastQueue = new ArrayList<Object>();
currentWriteFile = baseDirectory + "Table-1";
BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
while (!lastQueue.isEmpty()){
writer.write(lastQueue.get(0).toString()+ "\n");
lastQueue.remove(0);
}
writer.close();
}else{
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
String line=null;
while ((line=reader.readLine())!=null){
currentQueue.add(line);
}
reader.close();
File toDelete = new File(fileList.get(0));
toDelete.delete();
}
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1)));
currentWriteFile = fileList.get(fileList.size()-1);
String line=null;
while ((line=reader.readLine())!=null){
lastQueue.add(line);
}
reader.close();
//lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9));
}
}
}
private void loadFirst() throws IOException{
File baseLocation = new File(baseDirectory);
List<String> fileList = new ArrayList<String>();
for(File entry : baseLocation.listFiles()){
if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
fileList.add(entry.getAbsolutePath());
}
}
Collections.sort(fileList);
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
String line=null;
while ((line=reader.readLine())!=null){
currentQueue.add(line);
}
reader.close();
File toDelete = new File(fileList.get(0));
toDelete.delete();
}
}
public Object pop(){
if(currentQueue.size()>0)
return currentQueue.remove(0);
if(currentQueue.size()==0){
try {
loadFirst();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(currentQueue.size()>0)
return currentQueue.remove(0);
else
return null;
}
public synchronized Object waitTillPop() throws InterruptedException{
if(currentQueue.size()==0){
try {
loadFirst();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(currentQueue.size()==0)
wait();
}
return currentQueue.remove(0);
}
public synchronized void push(Object data) throws IOException{
lastQueue.add(data);
this.notifyAll();
if(lastQueue.size()>=minFlushSize){
BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
while (!lastQueue.isEmpty()){
writer.write(lastQueue.get(0).toString() + "\n");
lastQueue.remove(0);
}
writer.close();
currentWriteFile = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) +
(Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1);
}
}
public static void main(String[] args) {
try {
BlockingQueue bq = new BlockingQueue();
for(int i =0 ; i<=8 ; i++){
bq.push(""+i);
}
System.out.println(bq.pop());
System.out.println(bq.pop());
System.out.println(bq.pop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
} catch (Exception e) {
e.printStackTrace();
}
}