Question

I'm using WatchService to monitor a directory. Another third party will upload large CSV files to that directory via SFTP. I have to wait until all the files are finished to start processing the files.

My trouble right now is that SFTP creates the file as soon as the uploading starts I get ENTRY_CREATE and continuously get ENTRY_MODIFY until the file is done. Is there anyway to tell if the file is actually done.

This is the code I use which I got it from Java Documentation

public class WatchDir {

private final WatchService watcher;
private final Map<WatchKey, Path> keys;
private final boolean recursive;
private boolean trace = false;

@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
    return (WatchEvent<T>) event;
}

/**
 * Register the given directory with the WatchService
 */
private void register(Path dir) throws IOException {
    WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
    if (trace) {
        Path prev = keys.get(key);
        if (prev == null) {
            System.out.format("register: %s\n", dir);
        } else {
            if (!dir.equals(prev)) {
                System.out.format("update: %s -> %s\n", prev, dir);
            }
        }
    }
    keys.put(key, dir);
}

/**
 * Register the given directory, and all its sub-directories, with the
 * WatchService.
 */
private void registerAll(final Path start) throws IOException {
    // register directory and sub-directories
    Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
        @Override
        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
                throws IOException {
            register(dir);
            return FileVisitResult.CONTINUE;
        }
    });
}

/**
 * Creates a WatchService and registers the given directory
 */
WatchDir(Path dir, boolean recursive) throws IOException {
    this.watcher = FileSystems.getDefault().newWatchService();
    this.keys = new HashMap<WatchKey, Path>();
    this.recursive = recursive;

    if (recursive) {
        System.out.format("Scanning %s ...\n", dir);
        registerAll(dir);
        System.out.println("Done.");
    } else {
        register(dir);
    }

    // enable trace after initial registration
    this.trace = true;
}

/**
 * Process all events for keys queued to the watcher
 */
void processEvents() {
    for (; ; ) {

        // wait for key to be signalled
        WatchKey key;
        try {
            key = watcher.take();
        } catch (InterruptedException x) {
            return;
        }

        Path dir = keys.get(key);
        if (dir == null) {
            System.err.println("WatchKey not recognized!!");
            continue;
        }

        for (WatchEvent<?> event : key.pollEvents()) {
            WatchEvent.Kind kind = event.kind();

            // TBD - provide example of how OVERFLOW event is handled
            if (kind == OVERFLOW) {
                continue;
            }

            // Context for directory entry event is the file name of entry
            WatchEvent<Path> ev = cast(event);
            Path name = ev.context();
            Path child = dir.resolve(name);

            // print out event
            System.out.format("%s: %s\n", event.kind().name(), child);

            // if directory is created, and watching recursively, then
            // register it and its sub-directories
            if (recursive && (kind == ENTRY_CREATE)) {
                try {
                    if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
                        registerAll(child);
                    }
                } catch (IOException x) {
                    // ignore to keep sample readbale
                }
            }
        }

        // reset key and remove from set if directory no longer accessible
        boolean valid = key.reset();
        if (!valid) {
            keys.remove(key);

            // all directories are inaccessible
            if (keys.isEmpty()) {
                break;
            }
        }
    }
}

static void usage() {
    System.err.println("usage: java WatchDir [-r] dir");
    System.exit(-1);
}

public static void main(String[] args) throws IOException {
    // parse arguments
    if (args.length == 0 || args.length > 2)
        usage();
    boolean recursive = false;
    int dirArg = 0;
    if (args[0].equals("-r")) {
        if (args.length < 2)
            usage();
        recursive = true;
        dirArg++;
    }

    // register directory and process its events
    Path dir = Paths.get(args[dirArg]);
    new WatchDir(dir, recursive).processEvents();
}

}

Was it helpful?

Solution

Under Linux, you can use the "inotify" tools. they probably arrive with all major destributions. here is wiki for it : wiki - Inotify

Note in the supported events list you have:

IN_CLOSE_WRITE - sent when a file opened for writing is closed

IN_CLOSE_NOWRITE - sent when a file opened not for writing is closed

these are what you look for. I did not manage to see something similar on windows. Now as for using them, there can be various ways. I used a java library jnotify

Note that the library is cross platform, so you don't want to use the main class as windows does not support events for close file. you will want to use the linux API which exposes the full linux capabilities. just read the description page and you know its what you request : jnotify - linux

Note that in my case, I had to download the library source because I needed compile the shared object file "libjnotify.so" for 64 bit. the one provided worked only under 32 bit. Maybe they provide it now you can check.

check the examples for code and how to add and remove watches. just remember to use the "JNotify_linux" class instead of "JNotify" and then you can use a mask with your operation, for example.

private final int MASK = JNotify_linux.IN_CLOSE_WRITE;

I hope it will work for you.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top