JMSは、永続的なブロッキングキューの必要性に対する答えですか?
-
10-10-2019 - |
質問
私は、リモートサーバーにイベントを非同期に送信するLOG4J Appenderで構成されるライブラリを作成しています。ログステートメントが作成されると、Appenderはイベントを非同期的にローカルキューに記録します。これは、消費者のプールがリモートに回収して送信します。
完全にメモリのソリューションは、同時実行の問題を処理するブロッキングキューを作成することです。ただし、リモートサーバーが利用できない場合は、キューがバウンドされていないキューを成長させたり、境界のあるキューの場合にメッセージを破棄し始めたりしないように、キューを永続化したいと思います。
埋め込まれたH2データベースを使用してイベントをローカルに保存し、ポーリングメカニズムを使用してイベントを取得してリモートに送信することを考えていました。データベーステーブルを投票するよりも、ブロッキングキューを使用したいと思います。
JMSは答えですか?
編集:
JMSが答えであり、それがそのように進んでいるように見える場合、誰もがメッセージのみを受け入れるように構成できる軽量で組み込み可能なJMSソリューションに関する推奨事項を持っていますか?言い換えれば、私は聞くべきTCPソケットを開くことを望みませんし、おそらく許可されません。
編集:
ActiveMQが埋め込まれていますが、機能しているようです。皆さんありがとう。
解決
JMSを使用して、リモートマシンにメッセージを非同期的に送信できます(もちろんそれらを受信できると仮定します)、Log4Jにはこれに使用できるJMS Appenderがあります。
他のヒント
ボブ・リー・オープンは、しばらく前に非常にシンプルなディスクバックされたキューを調達しました、 https://github.com/square/retrofit/blob/master/modules/android/src/retrofit/io/queuefile.java - 役立つかもしれませんし、地元の耐久性を受け入れることができるなら、JMSよりも導入がずっと簡単です。
このクラスはスタンドアロンです - コピーして貼り付けることができます。
この目的には間違いなくJMSを使用できます。私が理解している限り、あなたはlog4j jms appenderを使用しています。このコンポーネントは、事前に構成されたJMS宛先(通常はキュー)にメッセージを送信します。このキューを構成するように構成することができます。この場合、キューに挿入されたすべてのメッセージは、一部の永続ストア(通常はデータベース)に自動的に保存されます。残念ながら、この構成はベンダー固有です(JMSベンダーに依存します)が、通常は非常に簡単です。 JMSプロバイダーのドキュメントを参照してください。
これが機能するかどうかを確認します
このコードはあなたのために機能するはずです - そのメモリの永続的なブロッキングキュー - いくつかのファイルチューニングが必要ですが、動作するはずです
package test;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class BlockingQueue {
//private static Long maxInMenorySize = 1L;
private static Long minFlushSize = 3L;
private static String baseDirectory = "/test/code/cache/";
private static String fileNameFormat = "Table-";
private static String currentWriteFile = "";
private static List<Object> currentQueue = new LinkedList<Object>();
private static List<Object> lastQueue = new LinkedList<Object>();
static{
try {
load();
} catch (IOException e) {
System.out.println("Unable To Load");
e.printStackTrace();
}
}
private static void load() throws IOException{
File baseLocation = new File(baseDirectory);
List<String> fileList = new ArrayList<String>();
for(File entry : baseLocation.listFiles()){
if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
fileList.add(entry.getAbsolutePath());
}
}
Collections.sort(fileList);
if(fileList.size()==0){
//currentQueue = lastQueue = new ArrayList<Object>();
currentWriteFile = baseDirectory + "Table-1";
BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
while (!lastQueue.isEmpty()){
writer.write(lastQueue.get(0).toString()+ "\n");
lastQueue.remove(0);
}
writer.close();
}else{
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
String line=null;
while ((line=reader.readLine())!=null){
currentQueue.add(line);
}
reader.close();
File toDelete = new File(fileList.get(0));
toDelete.delete();
}
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1)));
currentWriteFile = fileList.get(fileList.size()-1);
String line=null;
while ((line=reader.readLine())!=null){
lastQueue.add(line);
}
reader.close();
//lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9));
}
}
}
private void loadFirst() throws IOException{
File baseLocation = new File(baseDirectory);
List<String> fileList = new ArrayList<String>();
for(File entry : baseLocation.listFiles()){
if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
fileList.add(entry.getAbsolutePath());
}
}
Collections.sort(fileList);
if(fileList.size()>0){
BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
String line=null;
while ((line=reader.readLine())!=null){
currentQueue.add(line);
}
reader.close();
File toDelete = new File(fileList.get(0));
toDelete.delete();
}
}
public Object pop(){
if(currentQueue.size()>0)
return currentQueue.remove(0);
if(currentQueue.size()==0){
try {
loadFirst();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(currentQueue.size()>0)
return currentQueue.remove(0);
else
return null;
}
public synchronized Object waitTillPop() throws InterruptedException{
if(currentQueue.size()==0){
try {
loadFirst();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(currentQueue.size()==0)
wait();
}
return currentQueue.remove(0);
}
public synchronized void push(Object data) throws IOException{
lastQueue.add(data);
this.notifyAll();
if(lastQueue.size()>=minFlushSize){
BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
while (!lastQueue.isEmpty()){
writer.write(lastQueue.get(0).toString() + "\n");
lastQueue.remove(0);
}
writer.close();
currentWriteFile = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) +
(Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1);
}
}
public static void main(String[] args) {
try {
BlockingQueue bq = new BlockingQueue();
for(int i =0 ; i<=8 ; i++){
bq.push(""+i);
}
System.out.println(bq.pop());
System.out.println(bq.pop());
System.out.println(bq.pop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
System.out.println(bq.waitTillPop());
} catch (Exception e) {
e.printStackTrace();
}
}