Как мне обрабатывать несколько потоков в Java?

StackOverflow https://stackoverflow.com/questions/126138

  •  02-07-2019
  •  | 
  •  

Вопрос

Я пытаюсь запустить процесс и что-то делать с его потоками ввода, вывода и ошибок.Очевидный способ сделать это - использовать что-то вроде select(), но единственное, что я могу найти в Java , которое делает это, это Selector.select(), для чего требуется Channel.По-видимому, получить Channel из InputStream или OutputStream (FileStream имеет getChannel() метод, но здесь это не помогает)

Итак, вместо этого я написал некоторый код для опроса всех потоков:

while( !out_eof || !err_eof )
{
    while( out_str.available() )
    {
        if( (bytes = out_str.read(buf)) != -1 )
        {
            // Do something with output stream
        }
        else
            out_eof = true;
    }
    while( err_str.available() )
    {
        if( (bytes = err_str.read(buf)) != -1 )
        {
            // Do something with error stream
        }
        else
            err_eof = true;
    }
    sleep(100);
}

который работает, за исключением того, что он никогда не завершается.Когда один из потоков достигает конца файла, available() возвращает ноль , так что read() не вызывается, и мы никогда не получаем значение -1, которое указывало бы на EOF.

Одним из решений был бы неблокирующий способ обнаружения EOF.Я нигде не вижу ни одного из них в документах.В качестве альтернативы, есть ли лучший способ сделать то, что я хочу сделать?

Я вижу этот вопрос здесь:текст ссылки и хотя это не совсем то, что я хочу, я, вероятно, могу использовать эту идею создания отдельных потоков для каждого потока для конкретной проблемы, которая у меня сейчас есть.Но, конечно, это не единственный способ сделать это?Наверняка должен быть способ чтения из нескольких потоков без использования потока для каждого?

Это было полезно?

Решение

Как вы сказали, решение изложено в этом Ответе это традиционный способ считывания как stdout, так и stderr из процесса.Поток на поток - это правильный путь, даже если это немного раздражает.

Другие советы

Вам действительно придется пройти путь создания потока для каждого потока, который вы хотите отслеживать.Если ваш вариант использования позволяет комбинировать как stdout, так и stderr рассматриваемого процесса, вам нужен только один поток, в противном случае необходимы два.

Мне потребовалось довольно много времени, чтобы разобраться с этим в одном из наших проектов, где я должен запустить внешний процесс, получить его выходные данные и что-то с ними сделать, одновременно ища ошибки и завершение процесса, а также имея возможность завершить его, когда пользователь java-приложения отменяет операцию.

Я создал довольно простой класс для инкапсуляции наблюдающей части, метод run () которой выглядит примерно так:

public void run() {
    BufferedReader tStreamReader = null;
    try {
        while (externalCommand == null && !shouldHalt) {
            logger.warning("ExtProcMonitor("
                           + (watchStdErr ? "err" : "out")
                           + ") Sleeping until external command is found");
            Thread.sleep(500);
        }
        if (externalCommand == null) {
            return;
        }
        tStreamReader =
                new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream()
                        : externalCommand.getInputStream()));
        String tLine;
        while ((tLine = tStreamReader.readLine()) != null) {
            logger.severe(tLine);
            if (filter != null) {
                if (filter.matches(tLine)) {
                    informFilterListeners(tLine);
                    return;
                }
            }
        }
    } catch (IOException e) {
        logger.logExceptionMessage(e, "IOException stderr");
    } catch (InterruptedException e) {
        logger.logExceptionMessage(e, "InterruptedException waiting for external process");
    } finally {
        if (tStreamReader != null) {
            try {
                tStreamReader.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }
}

На вызывающей стороне это выглядит примерно так:

    Thread tExtMonitorThread = new Thread(new Runnable() {

        public void run() {
            try {
                while (externalCommand == null) {
                    getLogger().warning("Monitor: Sleeping until external command is found");
                    Thread.sleep(500);
                    if (isStopRequested()) {
                        getLogger()
                                .warning("Terminating external process on user request");
                        if (externalCommand != null) {
                            externalCommand.destroy();
                        }
                        return;
                    }
                }
                int tReturnCode = externalCommand.waitFor();
                getLogger().warning("External command exited with code " + tReturnCode);
            } catch (InterruptedException e) {
                getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit");
            }
        }
    }, "ExtCommandWaiter");

    ExternalProcessOutputHandlerThread tExtErrThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true);
    ExternalProcessOutputHandlerThread tExtOutThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true);
    tExtMonitorThread.start();
    tExtOutThread.start();
    tExtErrThread.start();
    tExtErrThread.setFilter(new FilterFunctor() {

        public boolean matches(Object o) {
            String tLine = (String)o;
            return tLine.indexOf("Error") > -1;
        }
    });

    FilterListener tListener = new FilterListener() {
        private boolean abortFlag = false;

        public boolean shouldAbort() {
            return abortFlag;
        }

        public void matched(String aLine) {
            abortFlag = abortFlag || (aLine.indexOf("Error") > -1);
        }

    };

    tExtErrThread.addFilterListener(tListener);
    externalCommand = new ProcessBuilder(aCommand).start();
    tExtErrThread.setProcess(externalCommand);
    try {
        tExtMonitorThread.join();
        tExtErrThread.join();
        tExtOutThread.join();
    } catch (InterruptedException e) {
        // when this happens try to bring the external process down 
        getLogger().severe("Aborted because auf InterruptedException.");
        getLogger().severe("Killing external command...");
        externalCommand.destroy();
        getLogger().severe("External command killed.");
        externalCommand = null;
        return -42;
    }
    int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue();

    externalCommand = null;
    try {
        getLogger().warning("command exit code: " + tRetVal);
    } catch (IllegalThreadStateException ex) {
        getLogger().warning("command exit code: unknown");
    }
    return tRetVal;

К сожалению, мне не нужно использовать автономный исполняемый пример, но, возможно, это поможет.Если бы мне пришлось сделать это снова, я бы еще раз взглянул на использование метода Thread.interrupt() вместо самодельного флага остановки (не забудьте объявить его изменчивым!), Но я оставляю это на другой раз.:)

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top