문제

I've searched high and low, but can't find a definitive, up-to-date answer to my question about NIO.

Is there any way to convert from an InputStream to a Channel that I can use with a Selector? It seems like Channels.newChannel() is the only way to do the conversion but doesn't provide an instance of AbstractSelectableChannel, which is really what I need.

More specifically, I would like to read from the stdout and stderr streams of a subprocess without creating one thread per stream, and it seems this is the only way to do it in pure Java. Since these streams are using pipes to pass I/O back and forth I'm surprised .newChannel doesn't return a Pipe.SourceChannel, which is a subclass of AbstractSelectableChannel.

I'm using Java 7 (although if new functionality is available in 8 I would still be happy for an answer).

EDIT: I also tried casting the results of .newChannel() to a selectable channel to no avail - it is not a selectable channel.

도움이 되었습니까?

해결책

There is no way to do what you're asking, but you don't need a thread per stream. Just merge the streams with the API provided for the purpose, and read the output in the current thread.

다른 팁

I have the same problem, but I can redirect output of subprocesses to files and analyze the output later. You can use something like the code below, but it has some drawbacks and some code should be added. Instead of the lock some concurrent collection can be used.

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class InputPrcocessSelector {

    final private Collection<Process> registeredProcesses = new HashSet<>();
    final private Lock registeredProcessesLock = new ReentrantLock();

    final private LinkedBlockingQueue<Process> readyProcesses = new LinkedBlockingQueue<>();

    final private ExecutorService executorService = Executors.newSingleThreadExecutor();

    final private static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
    final private static int DEFAULT_TIMEOUT = 100;

    final private int timeOut;
    final private TimeUnit timeUnit;

    public InputPrcocessSelector() {
        this(DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT);
    }

    public InputPrcocessSelector(int timeOut, TimeUnit timeUnit) {
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        this.executorService.submit(new SelectorTask());
    }

    public boolean register(Process process) {
        try {
            registeredProcessesLock.lock();
            return registeredProcesses.add(process);
        } finally {
            registeredProcessesLock.unlock();
        }
    }

    public boolean unregister(Process process) {
        try {
            registeredProcessesLock.lock();
            return registeredProcesses.remove(process);
        } finally {
            registeredProcessesLock.unlock();
        }
    }

    public Collection<Process> select() throws InterruptedException {
        HashSet<Process> selectedInputs = new HashSet<>();

        Process firstProcess = readyProcesses.take();
        selectedInputs.add(firstProcess);

        Process nextProcess = null;
        while ((nextProcess = readyProcesses.poll()) != null) {
            selectedInputs.add(nextProcess);
        }

        return selectedInputs;
    }

    private class SelectorTask implements Runnable {
        public void run() {
            while (true) {
                try {
                    registeredProcessesLock.lock();

                    Iterator<Process> it = registeredProcesses.iterator();
                    while (it.hasNext()) {
                        Process p = it.next();
                        try {
                            int available = p.getInputStream().available();
                            if (available > 0)
                                readyProcesses.add(p);
                            if (p.isAlive() == false) {
                                System.err.println("Not alive");
                                it.remove();
                            }
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }

                } finally {
                    registeredProcessesLock.unlock();
                }

                try {
                    timeUnit.sleep(timeOut);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

}
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top