Question

I am working on a javaFX project where user simply browse few files, create a queue of it, click on start upload and upload started of these files on a specified path. Now when user click on pause button, the upload process must be paused and when click on resume, the process must be started again from the point where it has been paused. So, till now, I have created a queue of Objects (Files) via Executors.newSingleThreadExecutor(); and submitted the task. In my case taks class is know as Item. Now, when user click on pause button, I call shutdownnow() method of executorService. and when user clicks on resume, I create a whole new instance of Executorservice. But, when I submit the job via executor.execute method, it skips the current task and resume the next task and so on. So, I need it to be start from the task which it was executing last. Here is the code.

public class UploadManagerController implements Initializable {

private static boolean checkRunning = true;

private static String currentClient;
private static int currentView = 0;
private static int checkLastSelectedQueue = 0;
private static Timeline fiveSecondsWonder;
private List<File> uploadDir;
private static long currentSelectedRow = -1;
private static ThreadFactory threadFactory;
private static ExecutorService executor;
private static Thread t;
private static LinkedHashMap<String, ObservableList> clientUploadInfo;
private static HashMap<String, Long> clientUploadInfoCount;
private static HashMap<String, String> clientUploadInfoTotalSize;
private static HashMap<String, Double> clientUploadInfoTotalSizeLong;


  public static class Item extends Task<Void> {

    public SimpleStringProperty fileName = new SimpleStringProperty();
    public SimpleStringProperty fileSize = new SimpleStringProperty();
    public SimpleStringProperty ETA = new SimpleStringProperty();
    public SimpleStringProperty speed = new SimpleStringProperty();
    public SimpleStringProperty status = new SimpleStringProperty();
    public SimpleDoubleProperty progress = new SimpleDoubleProperty();
    public SimpleObjectProperty clientName = new SimpleObjectProperty();
    public SimpleStringProperty path = new SimpleStringProperty();
    public SimpleLongProperty id = new SimpleLongProperty();
    public File file;
    public long time = 0;

    // Getter and setter of above variables

    @Override
    protected Void call() throws Exception {
        File filee = this.getFile();
        FileInputStream is = new FileInputStream(filee); 
        String mRecordingFile = "UserSpecifiedPath\\" + this.getFileName();

        File fl = new File(mRecordingFile);
        if (fl.exists() && (this.getStatus().equals("error") != true || this.getStatus().equalsIgnoreCase("stopped") != true)) {
            fl.renameTo(new File("UserSpecifiedPath\\" + Util.getTime() + "_" + fl.getName()));

        }

        boolean checkDelete = false;
        for (String key : clientUploadInfo.keySet()) {
            if (clientUploadInfo.get(key).contains(this)) {
                checkDelete = true;
                break;
            }
        }
        if ( checkDelete) {
            java.nio.channels.FileChannel fc = is.getChannel();
            long totalSize = filee.length();
            long total = totalSize / 10000;
            java.nio.ByteBuffer bb = java.nio.ByteBuffer.allocate(10000);

            byte[] bytes;

            while (fc.read(bb) > 0) {

                bb.flip();
                bytes = bb.array();
                File f = new File(mRecordingFile);
                try (FileOutputStream fOut = new FileOutputStream(mRecordingFile, true)) {
                    fOut.write(bytes);
                }
                      bb.clear();
            }
        }


        return null;
    }
}

 @Override
public void initialize(URL url, ResourceBundle rb) {
    clientUploadInfo = new LinkedHashMap<>();
          threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {

            t = new Thread(r);
            try {
                t.join();
            } catch (InterruptedException ex) {
                System.out.println("inte");
            }
            t.setDaemon(true);
            return t;
        }
    };
    executor = Executors.newSingleThreadExecutor(threadFactory);
}

  @FXML
private void startupload(ActionEvent event) {
    ObservableList<Item> temp = FXCollections.observableArrayList();
    temp.addAll(clientUploadInfo.get(currentClient));
    currentView = 1;
    checkRunning = true;
    if (executor.isShutdown()) {
        executor = null;
        t=null;
        threadFactory = null;
        threadFactory = new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                 t = new Thread(r);
                try {
                    t.join();
                } catch (InterruptedException ex) {
                    Logger.getLogger(UploadManagerController.class.getName()).log(Level.SEVERE, null, ex);
                }

                t.setDaemon(true);
                return t;
            }
        };
        executor = Executors.newSingleThreadExecutor(threadFactory);


         for (Item itm : temp) {
            if (itm.getStatus().equalsIgnoreCase("In Active") || itm.getStatus().equalsIgnoreCase("queued") || itm.getStatus().equalsIgnoreCase("error") || itm.getStatus().equalsIgnoreCase("stopped")) {

                executor.submit(itm);
            }
        }

    } else {
        for (Item itm : temp) {
            if (itm.getStatus().equalsIgnoreCase("In Active") || itm.getStatus().equalsIgnoreCase("queued") || itm.getStatus().equalsIgnoreCase("error") || itm.getStatus().equalsIgnoreCase("stopped")) {

                executor.execute(itm);
            }
        }
    }
}

@FXML
private void stopUpload(ActionEvent event) {
    checkRunning = false;
    executor.shutdownNow();

}
}

Sorry for my english, but let me try to explain, In the above code, their are two methods STARTUPLOAD and STOPUPLOAD, these two methods are called on click of javafx buttons, so when I first click on startupload button, it start's uploading files in clientUploadInfo.get(KEY) hasMap, which I maintain on click on add files button, I have not mentioned the code to make it shorter,Item class carries all details. On click on stop upload button, stopUpload method get called, on click on start upload button again, the startUpload method get called but to resume the upload process,remember in my case it resume the upload but from next item, it leaves the current item as it was while clicked on Stop button. Any help would be appreciated . Thanks in advance.

Was it helpful?

Solution 2

Try overriding Task<?> class, To pause and restart from the same thread you need to get the instance of the thread using your executor and ThreadFactory, Reintialize your executor will always skip the current thread as it has still it in memory and has a link to it, even if you put a null . So, I would suggest you to try pausing it once finished the current thread, show pop up instead to deliver a message to user that current process cannot be canceled or otherwise try overriding your Task and ThreadFactory and get the ID of the current thread and reinstialize that thread only.

OTHER TIPS

Your requirement is that tasks can be stopped. I don't think it is a good idea to implement this by suspending (however you do that) the executor, because suspending an executor would be highly unsafe in general (see the javadoc of Thread.suspend() for details), which is probably why executors don't support that.

A far better design would be to have the tasks wait while they shouldn't proceed, for instance with something like:

private boolean suspended;

synchronized void suspend() {
    suspended = true;
}

synchronized void resume() {
    suspended = false;
    notifyAll();
}

synchronized void waitWhileSuspended() {
    while (suspended) {
        wait();
    }
}

where the tasks would regularly invoke waitWhileSuspended() at times where they can safely be interrupted.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top