كيف يمكنني التعامل مع تدفقات متعددة في جافا؟

StackOverflow https://stackoverflow.com/questions/126138

  •  02-07-2019
  •  | 
  •  

سؤال

أحاول تشغيل عملية والقيام بالأشياء باستخدام تدفقات الإدخال والإخراج والخطأ.الطريقة الواضحة للقيام بذلك هي استخدام شيء مثل select(), ولكن الشيء الوحيد الذي يمكنني العثور عليه في Java الذي يفعل ذلك هو Selector.select(), ، والذي يستغرق أ Channel.لا يبدو أنه من الممكن الحصول على Channel من InputStream أو OutputStream (FileStream لديه getChannel() الطريقة ولكن هذا لا يساعد هنا)

لذا، بدلاً من ذلك كتبت بعض التعليمات البرمجية لاستقصاء جميع التدفقات:

while( !out_eof || !err_eof )
{
    while( out_str.available() )
    {
        if( (bytes = out_str.read(buf)) != -1 )
        {
            // Do something with output stream
        }
        else
            out_eof = true;
    }
    while( err_str.available() )
    {
        if( (bytes = err_str.read(buf)) != -1 )
        {
            // Do something with error stream
        }
        else
            err_eof = true;
    }
    sleep(100);
}

الذي يعمل، إلا أنه لا ينتهي أبدا.عندما يصل أحد التدفقات إلى نهاية الملف، available() إرجاع الصفر لذلك read() لم يتم استدعاؤه ولن نحصل أبدًا على الإرجاع -1 الذي يشير إلى EOF.

قد يكون أحد الحلول هو طريقة غير معيقة للكشف عن EOF.لا أستطيع رؤية واحدة في المستندات في أي مكان.وبدلاً من ذلك، هل هناك طريقة أفضل للقيام بما أريد القيام به؟

أرى هذا السؤال هنا:نص الرابطوعلى الرغم من أنها لا تفعل ما أريده تمامًا، إلا أنه يمكنني على الأرجح استخدام هذه الفكرة، المتمثلة في إنشاء سلاسل رسائل منفصلة لكل تيار، للمشكلة المحددة التي أواجهها الآن.لكن من المؤكد أن هذه ليست الطريقة الوحيدة للقيام بذلك؟بالتأكيد يجب أن تكون هناك طريقة للقراءة من تدفقات متعددة دون استخدام مؤشر ترابط لكل منها؟

هل كانت مفيدة؟

المحلول

كما قلت الحل المبينة في هذه الإجابة هي الطريقة التقليدية لقراءة كل من stdout وstderr من العملية.إن استخدام مؤشر ترابط لكل دفق هو الحل الأمثل، على الرغم من أنه مزعج بعض الشيء.

نصائح أخرى

سيكون عليك بالفعل أن تسلك طريق إنتاج سلسلة رسائل لكل تيار تريد مراقبته.إذا كانت حالة الاستخدام الخاصة بك تسمح بدمج كل من stdout وstderr للعملية المعنية، فستحتاج إلى مؤشر ترابط واحد فقط، وإلا فستكون هناك حاجة إلى خيطين.

لقد استغرق الأمر مني بعض الوقت لإنجاز الأمر بشكل صحيح في أحد مشاريعنا حيث يتعين علي إطلاق عملية خارجية وأخذ مخرجاتها والقيام بشيء بها بينما أبحث في نفس الوقت عن الأخطاء وإنهاء العملية وأيضًا القدرة على إنهائها عندما يقوم مستخدم تطبيق Java بإلغاء العملية.

لقد قمت بإنشاء فئة بسيطة إلى حد ما لتغليف جزء المشاهدة الذي تبدو طريقة تشغيله () كما يلي:

public void run() {
    BufferedReader tStreamReader = null;
    try {
        while (externalCommand == null && !shouldHalt) {
            logger.warning("ExtProcMonitor("
                           + (watchStdErr ? "err" : "out")
                           + ") Sleeping until external command is found");
            Thread.sleep(500);
        }
        if (externalCommand == null) {
            return;
        }
        tStreamReader =
                new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream()
                        : externalCommand.getInputStream()));
        String tLine;
        while ((tLine = tStreamReader.readLine()) != null) {
            logger.severe(tLine);
            if (filter != null) {
                if (filter.matches(tLine)) {
                    informFilterListeners(tLine);
                    return;
                }
            }
        }
    } catch (IOException e) {
        logger.logExceptionMessage(e, "IOException stderr");
    } catch (InterruptedException e) {
        logger.logExceptionMessage(e, "InterruptedException waiting for external process");
    } finally {
        if (tStreamReader != null) {
            try {
                tStreamReader.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }
}

من ناحية الاتصال يبدو الأمر كما يلي:

    Thread tExtMonitorThread = new Thread(new Runnable() {

        public void run() {
            try {
                while (externalCommand == null) {
                    getLogger().warning("Monitor: Sleeping until external command is found");
                    Thread.sleep(500);
                    if (isStopRequested()) {
                        getLogger()
                                .warning("Terminating external process on user request");
                        if (externalCommand != null) {
                            externalCommand.destroy();
                        }
                        return;
                    }
                }
                int tReturnCode = externalCommand.waitFor();
                getLogger().warning("External command exited with code " + tReturnCode);
            } catch (InterruptedException e) {
                getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit");
            }
        }
    }, "ExtCommandWaiter");

    ExternalProcessOutputHandlerThread tExtErrThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true);
    ExternalProcessOutputHandlerThread tExtOutThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true);
    tExtMonitorThread.start();
    tExtOutThread.start();
    tExtErrThread.start();
    tExtErrThread.setFilter(new FilterFunctor() {

        public boolean matches(Object o) {
            String tLine = (String)o;
            return tLine.indexOf("Error") > -1;
        }
    });

    FilterListener tListener = new FilterListener() {
        private boolean abortFlag = false;

        public boolean shouldAbort() {
            return abortFlag;
        }

        public void matched(String aLine) {
            abortFlag = abortFlag || (aLine.indexOf("Error") > -1);
        }

    };

    tExtErrThread.addFilterListener(tListener);
    externalCommand = new ProcessBuilder(aCommand).start();
    tExtErrThread.setProcess(externalCommand);
    try {
        tExtMonitorThread.join();
        tExtErrThread.join();
        tExtOutThread.join();
    } catch (InterruptedException e) {
        // when this happens try to bring the external process down 
        getLogger().severe("Aborted because auf InterruptedException.");
        getLogger().severe("Killing external command...");
        externalCommand.destroy();
        getLogger().severe("External command killed.");
        externalCommand = null;
        return -42;
    }
    int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue();

    externalCommand = null;
    try {
        getLogger().warning("command exit code: " + tRetVal);
    } catch (IllegalThreadStateException ex) {
        getLogger().warning("command exit code: unknown");
    }
    return tRetVal;

لسوء الحظ، لا أحتاج إلى مثال قابل للتشغيل قائم بذاته، ولكن ربما يكون هذا مفيدًا.إذا اضطررت إلى القيام بذلك مرة أخرى، فسألقي نظرة أخرى على استخدام طريقة Thread.interrupt()‎ بدلاً من علامة التوقف ذاتية الصنع (أفكر في إعلان أنها متقلبة!)، لكنني أترك ذلك لوقت آخر.:)

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top