Является ли JMS ответом на необходимость постоянной очереди блокировки?
-
10-10-2019 - |
Вопрос
Я создаю библиотеку, которая состоит из приложения Log4J, который асинхронно отправляет события на удаленный сервер. Когда будет сделан оператор журнала, приложение асинхронно записывает событие в локальную очередь, которую затем получат пул потребителей и отправит в удаленное управление.
Полностью решением в памяти было бы создание блокировки, которая будет решать проблему параллелизма. Тем не менее, я бы хотел, чтобы очередь сохранялась, чтобы, если удаленный сервер недоступен, я не выращивал очередь без ограничений и не начал отказываться от сообщений в случае ограниченной очереди.
Я думал об использовании встроенной базы данных H2 для хранения событий локально, а затем использовать механизм опроса для извлечения событий и отправки в пульт. Я бы предпочел использовать блокировку, чем опросить таблицу базы данных.
JMS - ответ?
РЕДАКТИРОВАТЬ:
Если JMS является ответом, и, похоже, он идет таким образом, есть ли у кого-нибудь рекомендации по легким, встраиваемому решению JMS, которое можно настроить только для приема сообщений в процессе? Другими словами, я не хочу, и, возможно, не разрешается, открывать розетку TCP, на котором можно слушать.
РЕДАКТИРОВАТЬ:
У меня сейчас встроено ActiveMQ, и, похоже, он работает. Спасибо всем.
Решение
Вы можете использовать JMS для асинхронной отправки сообщений на удаленную машину (при условии, что он может получить их, конечно), у Log4J есть приложение JMS, который вы можете использовать для этого.
Другие советы
Боб Ли открылся очень просто https://github.com/square/retrofit/blob/master/modules/android/src/retrofit/io/queuefile.java - Может быть полезным, и, безусловно, намного проще ввести, чем JMS, если вы можете принять местную долговечность.
Этот класс является автономным - его можно скопировать и вставлено.
Вы определенно можете использовать JMS для этой цели. Насколько я понимаю, вы используете приложение Log4J JMS. Этот компонент отправляет сообщения в предварительно сконфигурированный пункт назначения JMS (обычно очередь). Вы можете настроить эту очередь, которая будет сохраняться. В этом случае все сообщения, вставленные в очередь, будут автоматически храниться в каком -то сохраняемом хранилище (обычно база данных.). К сожалению, эта конфигурация специфична для поставщика (зависит от поставщика JMS), но обычно очень проста. Пожалуйста, обратитесь к документации вашего поставщика JMS.
Посмотрите, работает ли это
Этот код должен работать для вас - это постоянная очередь блокировки в памяти - нужна некоторая настройка файлов, но должен работать
package test;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class BlockingQueue {
//private static Long maxInMenorySize = 1L;
private static Long minFlushSize = 3L;
private static String baseDirectory = "/test/code/cache/";
private static String fileNameFormat = "Table-";
private static String currentWriteFile = "";
private static List<Object> currentQueue = new LinkedList<Object>();
private static List<Object> lastQueue = new LinkedList<Object>();
static{
try {
load();
} catch (IOException e) {
System.out.println("Unable To Load");
e.printStackTrace();
}
}
private static void load() throws IOException{
File baseLocation = new File(baseDirectory);
List<String> fileList = new ArrayList<String>();
for(File entry : baseLocation.listFiles()){
if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
fileList.add(entry.getAbsolutePath());
}
}
Collections.sort(fileList);
if(fileList.size()==0){
//currentQueue = lastQueue = new ArrayList<Object>();
currentWriteFile = baseDirectory + "Table-1";
BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
while (!lastQueue.isEmpty()){
writer.write(lastQueue.get(0).toString()+ "\n");
lastQueue.remove(0);
}
writer.close();
}else{
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
String line=null;
while ((line=reader.readLine())!=null){
currentQueue.add(line);
}
reader.close();
File toDelete = new File(fileList.get(0));
toDelete.delete();
}
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1)));
currentWriteFile = fileList.get(fileList.size()-1);
String line=null;
while ((line=reader.readLine())!=null){
lastQueue.add(line);
}
reader.close();
//lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9));
}
}
}
private void loadFirst() throws IOException{
File baseLocation = new File(baseDirectory);
List<String> fileList = new ArrayList<String>();
for(File entry : baseLocation.listFiles()){
if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
fileList.add(entry.getAbsolutePath());
}
}
Collections.sort(fileList);
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
String line=null;
while ((line=reader.readLine())!=null){
currentQueue.add(line);
}
reader.close();
File toDelete = new File(fileList.get(0));
toDelete.delete();
}
}
public Object pop(){
if(currentQueue.size()>0)
return currentQueue.remove(0);
if(currentQueue.size()==0){
try {
loadFirst();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(currentQueue.size()>0)
return currentQueue.remove(0);
else
return null;
}
public synchronized Object waitTillPop() throws InterruptedException{
if(currentQueue.size()==0){
try {
loadFirst();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(currentQueue.size()==0)
wait();
}
return currentQueue.remove(0);
}
public synchronized void push(Object data) throws IOException{
lastQueue.add(data);
this.notifyAll();
if(lastQueue.size()>=minFlushSize){
BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
while (!lastQueue.isEmpty()){
writer.write(lastQueue.get(0).toString() + "\n");
lastQueue.remove(0);
}
writer.close();
currentWriteFile = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) +
(Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1);
}
}
public static void main(String[] args) {
try {
BlockingQueue bq = new BlockingQueue();
for(int i =0 ; i<=8 ; i++){
bq.push(""+i);
}
System.out.println(bq.pop());
System.out.println(bq.pop());
System.out.println(bq.pop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
} catch (Exception e) {
e.printStackTrace();
}
}