Question

I have created the normal publishers and subscribers implemented using java , which works as reading the contents by size as 1MB of total size 5MB and published on every 1MB to the subscriber.Data is getting published successfully .Now 'm facing the issue on appending the content to the existing file .Finally i could find only the last 1MB of data in the file.So please let me to know how to solve this issue ? and also i have attached the source code for publisher and subscriber.

Publisher:

public class MessageDataPublisher {
    static StringBuffer fileContent;
    static RandomAccessFile randomAccessFile ;

    public static void main(String[] args) throws IOException {
        MessageDataPublisher msgObj=new MessageDataPublisher();

        String fileToWrite="test.txt";
        msgObj.towriteDDS(fileToWrite);
    }


    public void towriteDDS(String fileName) throws IOException{

        DDSEntityManager mgr=new DDSEntityManager();
        String partitionName="PARTICIPANT";



        // create Domain Participant
        mgr.createParticipant(partitionName);

        // create Type
        BinaryFileTypeSupport binary=new BinaryFileTypeSupport();
        mgr.registerType(binary);


        // create Topic
        mgr.createTopic("Serials");

        // create Publisher
        mgr.createPublisher();

        // create DataWriter
        mgr.createWriter();

        // Publish Events

        DataWriter dwriter = mgr.getWriter();
        BinaryFileDataWriter binaryWriter=BinaryFileDataWriterHelper.narrow(dwriter);


        int bufferSize=1024*1024;


        File readfile=new File(fileName);
        FileInputStream is = new FileInputStream(readfile);
        byte[] totalbytes = new byte[is.available()];
        is.read(totalbytes);
        byte[] readbyte = new byte[bufferSize];
        BinaryFile binaryInstance;

        int k=0;
        for(int i=0;i<totalbytes.length;i++){
            readbyte[k]=totalbytes[i];
            k++;
            if(k>(bufferSize-1)){
                binaryInstance=new BinaryFile();
                binaryInstance.name="sendpublisher.txt";
                binaryInstance.contents=readbyte;
                int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
                ErrorHandler.checkStatus(status, "MsgDataWriter.write");

                ErrorHandler.checkStatus(status, "MsgDataWriter.write");

                k=0;
                }

        }
        if(k < (bufferSize-1)){
            byte[] remaingbyte = new byte[k];                   
            for(int j=0;j<(k-1);j++){
                remaingbyte[j]=readbyte[j];
            }
            binaryInstance=new BinaryFile();
            binaryInstance.name="sendpublisher.txt";
            binaryInstance.contents=remaingbyte;
            int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
            ErrorHandler.checkStatus(status, "MsgDataWriter.write");

        }       
        is.close();


        try {
            Thread.sleep(4000);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // clean up
        mgr.getPublisher().delete_datawriter(binaryWriter);
        mgr.deletePublisher();
        mgr.deleteTopic();
        mgr.deleteParticipant();

    }




}


Subscriber:


public class MessageDataSubscriber {
    static RandomAccessFile randomAccessFile ;
    public static void main(String[] args) throws IOException {
        DDSEntityManager mgr = new DDSEntityManager();
        String partitionName = "PARTICIPANT";

        // create Domain Participant
        mgr.createParticipant(partitionName);

        // create Type
        BinaryFileTypeSupport msgTS = new BinaryFileTypeSupport();
        mgr.registerType(msgTS);

        // create Topic
        mgr.createTopic("Serials");

        // create Subscriber
        mgr.createSubscriber();

        // create DataReader
        mgr.createReader();

        // Read Events
        DataReader dreader = mgr.getReader();
        BinaryFileDataReader binaryReader=BinaryFileDataReaderHelper.narrow(dreader);
        BinaryFileSeqHolder binaryseq=new BinaryFileSeqHolder();
        SampleInfoSeqHolder infoSeq = new SampleInfoSeqHolder();
        boolean terminate = false;
        int count = 0;

        while (!terminate && count < 1500) {
             // To run undefinitely
            binaryReader.take(binaryseq, infoSeq, 10,
                    ANY_SAMPLE_STATE.value, ANY_VIEW_STATE.value,ANY_INSTANCE_STATE.value);
                for (int i = 0; i < binaryseq.value.length; i++) {
                    toWrtieXML(binaryseq.value[i].contents);
                    terminate = true;
            }

            try
            {
                Thread.sleep(200);
            }
            catch(InterruptedException ie)
            {
            }
            ++count;

        }
            binaryReader.return_loan(binaryseq,infoSeq);

        // clean up

        mgr.getSubscriber().delete_datareader(binaryReader);
        mgr.deleteSubscriber();
        mgr.deleteTopic();
        mgr.deleteParticipant();

    }

    private static void toWrtieXML(byte[] bytes) throws IOException {
        // TODO Auto-generated method stub
        File Writefile=new File("samplesubscriber.txt");
        if(!Writefile.exists()){
            randomAccessFile = new RandomAccessFile(Writefile, "rw");
            randomAccessFile.write(bytes, 0, bytes.length);
            randomAccessFile.close();
            }
            else{
                randomAccessFile = new RandomAccessFile(Writefile, "rw");
                long i=Writefile.length();
                randomAccessFile.seek(i);
                randomAccessFile.write(bytes, 0, bytes.length);
                randomAccessFile.close();
            }


    }
}

Thanks in advance

Was it helpful?

Solution

It is hard to give a conclusive answer to your question, because your issue could be the result of several different causes. Also, once the cause of the problem has been identified, you will probably have multiple options to mitigate it.

The first place to look is at the reader side. The code does a take() in a loop with a 200 millisecond pause between each take. Depending on your QoS settings on the DataReader, you might be facing a situation where your samples get overwritten in the DataReader while your application is sleeping for 200 milliseconds. If you are doing this over a gigabit ethernet, then a typical DDS product would be able to do those 5 chunks of 1 megabyte within that sleep period, meaning that your default, one-place buffer will get overwritten 4 times during your sleep.

This scenario would be likely if you used the default history QoS settings for your BinaryFileDataReader, which means history.kind = KEEP_LAST and history.depth = 1. Increasing the latter to a larger value, for example to 20, would result in a queue capable of holding 20 chunks of your file while you are sleeping. That should be sufficient for now.

If this does not resolve your issue, other possible causes can be explored.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top