Question

I'm dealing with a program that reads in items from a .csv file, and writes them to a remote database. I'm trying to multithread the program, and to that end I have created 2 process threads with distinct connections. To this end, the .csv file is read into a buffered reader, and the contents of the buffered reader are processed. However, it seems that the threads keep replicating the data (writing two copies of every tuple into the database).

I've been trying to figure out how to mutex a buffer in Java, and the closest thing I could come up with is a priority queue.

My question is, can you use a buffered reader to read a file into a priority queue line by line? I.E.

public void readFile(Connection connection) {
        BufferedReader bufReader = null;
        try{
            bufReader = new BufferedReader(new FileReader(RECS_FILE));
            bufReader.readLine(); //skip header line
            String line;
            while((line = bufReader.readLine()) != null) {
                //extract fields from each line of the RECS_FILE
                Pattern pattern = Pattern.compile( "\"([^\"]+)\",\"([^\"]+)\",\"([^\"]+)\",\"([^\"]+)\"");
                Matcher matcher = pattern.matcher(line); 
                if(!matcher.matches()) {
                    System.err.println("Unexpected line in "+RECS_FILE+": \""+line+"\"");
                    continue;
                }
                String stockSymbol = matcher.group(1);
                String recDateStr = matcher.group(2);
                String direction = matcher.group(3);
                String completeUrl = matcher.group(4);

                //create recommendation object to populate required fields
                //  and insert it into the database
                System.out.println("Inserting to DB!");
                Recommendation rec = new Recommendation(stockSymbol, recDate, direction, completeUrl);
                rec.insertToDb(connection);
            }
        } catch (IOException e) {
            System.err.println("Unable to read "+RECS_FILE);
            e.printStackTrace();
        } finally {
            if(bufReader != null) {
                try{
                    bufReader.close();
                } catch (IOException e) {
                }
            }
        }

    }

You'll see that a buffered reader is used to read in the .csv file. Is there a way to set up a priority queue outside the function such that the buffered reader is putting tuples in a priority queue, and each program thread then accesses the priority queue?

Was it helpful?

Solution

Buffered readers, or indeed any reader or stream are by their nature for single-thread use only. Priority queues are a completely separate structure which, depending on the actual implementation, may or may not be usable by multiple threads. So the short answer is: no, they're two completely unrelated concepts.

To address your original problem: you can't use streamed file access with multiple threads. You can use RandomAccessFile in theory, except that your lines aren't fixed width and therefore you can't seek() to the beginning of a line without reading everything in the file up to that point. Moreover, even if your data consists of fixed-with records, it might be impractical to read a file with two different threads.

The only thing you can parallelise is the database insert, with the obvious caveat that you lose transactionality, as you have to use separate transactions for each thread. (If you don't, you have to synchronise your database operations, which once again means that you haven't won anything.)

So a solution can be to read the lines from one thread and pass on the strings to a processing method invoked via an ExecutorService. That would scale well, but again there is a caveat: the increased overhead of database locking will probably nullify the advantage of using multiple threads.

The ultimate lesson is probably not to overcomplicate things: try the simple way and only look for a more complex solution if the simple one didn't work. The other lesson is perhaps that multithreading doesn't help I/O-bound programs.

OTHER TIPS

@Biziclop's answer is spot on (+1) but I thought I'd add something about bulk database inserts.

In case you didn't know, turning off database auto-commit in most SQL databases is a big win during bulk inserts. Typically after each SQL statement, the database commits it to disk storage which updates indexes and makes all of the changes to the disk structures. By turning off this auto-commit, the database only has to make these changes when you call commit at the end. Typically you would do something like:

conn.setAutoCommit(false);
for (Recommendation rec : toBeInsertedList) {
    rec.insertToDb(connection);
}
conn.setAutoCommit(true);

In addition, if auto-commit is not supported by your database, often wrapping the inserts in a transaction accomplishes the same thing.

Here are some another answers that may help:

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top