Question

So I'm trying to run a code which creates a table in a remote MySQL server, by referencing tables located in a different MYSQL server. The server that I am trying to create a table in has space limitations, and the tables that are being referenced are very large, so they have to be kept on a different remote server.

I'm trying to find a way to set up persistent connections to both databases at the same time (using JDBC libraries) so I don't have to keep buffering small numbers of lines of data... I'd like to be able to just reference the data directly.

E.g. Database A contains the data I am referencing, and Database B is where I am creating the new tables. say the table I am referencing in Database A is 1,000,000 lines. Instead of, say, opening a connection to Database A, buffering 10,000 lines, closing the connection, opening a connection to Database B, writing the that database, deleting my buffer, and repeating...

I'd like to just have a persistent connection to database A, so every write to Database B can reference the data in Database A.

Is this possible? I've tried several ways (mostly by creating new connection objects that only refresh if the connection breaks), and I can't seem to get this idea working.

Has anyone done something similar to this using JDBC? If so, It would be much appreciated if you could point me in the right direction, or tell me how you managed to get it working.

Was it helpful?

Solution

You could create the data in Database A, and copy it to Database B via replication.

Alternatively, it sounds like you're implementing a queue of some kind. I once built a data-copying program in Java, which used a built-in implementation of the Queue interface. I had a thread that read the data from Database A and filled the queue, and a thread that read from the queue and wrote to Database B. I can try and dig up the classes I used if that's any use?

EDIT:

Here's the code, somewhat tweaked for publishing. I haven't include the config classes, but it should give you the idea of how to use the queue class;

package test;

import java.io.File;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * This class implements a JDBC bridge between databases, allowing data to be
 * copied from one place to another.
 * <p>This implementation is threaded, as it uses a {@link BlockingQueue} to pass
 * data between a producer and a consumer.
 */
public class DBBridge {

    public static void main( String[] args ) {

        Adaptor fromAdaptor = null;
        Adaptor toAdaptor = null;

        BridgeConfig config = null;

        try {
            /* BridgeConfig is essentially a wrapper around the Simple XML serialisation library.
             * http://simple.sourceforge.net/
             */
            config = BridgeConfig.loadConfig( new File( "db-bridge.xml" ) );
        }
        catch ( Exception e ) {
            System.err.println( "Failed to read or parse db-bridge.xml: " + e.getLocalizedMessage() );
            System.exit( 1 );
        }

        BlockingQueue<Object> b = new ArrayBlockingQueue<Object>( config.getQueueSize() );

        try {
            HashMap<String, DatabaseConfig> dbs = config.getDbs();

            System.err.println( "Configured DBs" );

            final String sourceName = config.getSource();
            final String destinationName = config.getDestination();

            if ( !dbs.containsKey( sourceName ) ) {
                System.err.println( sourceName + " is not a configured database connection" );
                System.exit( 1 );
            }

            if ( !dbs.containsKey( destinationName ) ) {
                System.err.println( destinationName + " is not a configured database connection" );
                System.exit( 1 );
            }

            DatabaseConfig sourceConfig = dbs.get( sourceName );
            DatabaseConfig destinationConfig = dbs.get( destinationName );

            try {
                /*
                 * Both adaptors must be created before attempting a connection,
                 * as otherwise I've seen DriverManager pick the wrong driver!
                 */
                fromAdaptor = AdaptorFactory.buildAdaptor( sourceConfig, sourceConfig );
                toAdaptor = AdaptorFactory.buildAdaptor( destinationConfig, destinationConfig );

                System.err.println( "Connecting to " + sourceName );
                fromAdaptor.connect();

                System.err.println( "Connecting to " + destinationName );
                toAdaptor.connect();

                /* We'll send our updates to the destination explicitly */
                toAdaptor.getConn().setAutoCommit( false );
            }
            catch ( SQLException e ) {
                System.err.println();
                System.err.println( "Failed to create and configure adaptors" );
                e.printStackTrace();
                System.exit( 1 );
            }
            catch ( ClassNotFoundException e ) {
                System.err.println( "Failed to load JDBC driver due to error: " + e.getLocalizedMessage() );
                System.exit( 1 );
            }

            DataProducer producer = null;
            DataConsumer consumer = null;

            try {
                producer = new DataProducer( config, fromAdaptor, b );
                consumer = new DataConsumer( config, toAdaptor, b );
            }
            catch ( SQLException e ) {
                System.err.println();
                System.err.println( "Failed to create and configure data producer or consumer" );
                e.printStackTrace();
                System.exit( 1 );
            }

            consumer.start();
            producer.start();
        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
    }

    public static class DataProducer extends DataLogger {

        private BridgeConfig config;
        private Adaptor adaptor;
        private BlockingQueue<Object> queue;


        public DataProducer(BridgeConfig c, Adaptor a, BlockingQueue<Object> bq) {
            super( "Producer" );
            this.config = c;
            this.adaptor = a;
            this.queue = bq;
        }


        @Override
        public void run() {
            /* The tables to copy are listed in BridgeConfig */
            for ( Table table : this.config.getManifest() ) {

                PreparedStatement stmt = null;
                ResultSet rs = null;

                try {
                    String sql = table.buildSourceSelect();
                    this.log( "executing: " + sql );
                    stmt = this.adaptor.getConn().prepareStatement( sql );

                    stmt.execute();

                    rs = stmt.getResultSet();

                    ResultSetMetaData meta = rs.getMetaData();

                    /* Notify consumer that a new table is to be processed */
                    this.queue.put( table );
                    this.queue.put( meta );

                    final int columnCount = meta.getColumnCount();

                    while ( rs.next() ) {
                        ArrayList<Object> a = new ArrayList<Object>( columnCount );

                        for ( int i = 0; i < columnCount; i++ ) {
                            a.add( rs.getObject( i + 1 ) );
                        }

                        this.queue.put( a );
                    }
                }
                catch ( InterruptedException ex ) {
                    ex.printStackTrace();
                }
                catch ( SQLException e ) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                try {
                    /* refresh the connection */
                    /* Can't remember why I can this line - maybe the other
                     * end kept closing the connection. */
                    this.adaptor.reconnect();
                }
                catch ( SQLException e ) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            try {
                /* Use an object of a specific type to "poison" the queue
                 * and instruct the consumer to terminate. */
                this.log( "putting finished object into queue" );
                this.queue.put( new QueueFinished() );

                this.adaptor.close();
            }
            catch ( InterruptedException e ) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch ( SQLException e ) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    /* Superclass for producer and consumer */
    public static abstract class DataLogger extends Thread {

        private String prefix;


        public DataLogger(String p) {
            this.prefix = p;
        }


        protected void log( String s ) {
            System.err.printf( "%d %s: %s%n", System.currentTimeMillis(), this.prefix, s );
        }


        protected void log() {
            System.err.println();
        }
    }

    public static class DataConsumer extends DataLogger {

        private BridgeConfig config;
        private Adaptor adaptor;
        private BlockingQueue<Object> queue;
        private int currentRowNumber = 0;
        private int currentBatchSize = 0;
        private long tableStartTs = -1;


        public DataConsumer(BridgeConfig c, Adaptor a, BlockingQueue<Object> bq) throws SQLException {

            super( "Consumer" );

            this.config = c;
            this.adaptor = a;
            this.queue = bq;

            /* We'll send our updates to the destination explicitly */
            this.adaptor.getConn().setAutoCommit( false );
        }


        public void printThroughput() {
            double duration = ( System.currentTimeMillis() - this.tableStartTs ) / 1000.0;
            long rowsPerSec = Math.round( this.currentRowNumber / duration );
            this.log( String.format( "%d rows processed, %d rows/s", this.currentRowNumber, rowsPerSec ) );
        }


        @Override
        public void run() {

            this.log( "running" );

            Table currentTable = null;
            ResultSetMetaData meta = null;

            int columnCount = -1;

            PreparedStatement stmt = null;

            while ( true ) {
                try {
                    Object o = this.queue.take();

                    if ( o instanceof Table ) {
                        currentTable = (Table) o;

                        this.log( "processing" + currentTable );

                        if ( this.currentBatchSize > 0 ) {
                            /* Commit outstanding rows from previous table */

                            this.adaptor.getConn().commit();

                            this.printThroughput();
                            this.currentBatchSize = 0;
                        }

                        /* refresh the connection */
                        this.adaptor.reconnect();
                        this.adaptor.getConn().setAutoCommit( false );

                        /*
                         * Arguably, there's no need to flush the commit buffer
                         * after every table, but I like it because it feels
                         * tidy.
                         */
                        this.currentBatchSize = 0;
                        this.currentRowNumber = 0;

                        if ( currentTable.isTruncate() ) {
                            this.log( "truncating " + currentTable );
                            stmt = this.adaptor.getConn().prepareStatement( "TRUNCATE TABLE " + currentTable );
                            stmt.execute();
                        }

                        this.tableStartTs = System.currentTimeMillis();
                    }
                    else if ( o instanceof ResultSetMetaData ) {

                        this.log( "received metadata for " + currentTable );

                        meta = (ResultSetMetaData) o;
                        columnCount = meta.getColumnCount();

                        String sql = currentTable.buildDestinationInsert( columnCount );
                        stmt = this.adaptor.getConn().prepareStatement( sql );
                    }
                    else if ( o instanceof ArrayList ) {

                        ArrayList<?> a = (ArrayList<?>) o;

                        /* One counter for ArrayList access, one for JDBC access */
                        for ( int i = 0, j = 1; i < columnCount; i++, j++ ) {

                            try {
                                stmt.setObject( j, a.get( i ), meta.getColumnType( j ) );
                            }
                            catch ( SQLException e ) {
                                /* Sometimes data in a shonky remote system
                                 * is rejected by a more sane destination
                                 * system. Translate this data into
                                 * something that will fit. */
                                if ( e.getMessage().contains( "Only dates between" ) ) {

                                    if ( meta.isNullable( j ) == ResultSetMetaData.columnNullable ) {
                                        this.log( "Casting bad data to null: " + a.get( i ) );
                                        stmt.setObject( j, null, meta.getColumnType( j ) );
                                    }
                                    else {
                                        this.log( "Casting bad data to 0000-01-01: " + a.get( i ) );
                                        stmt.setObject( j, new java.sql.Date( -64376208000L ), meta.getColumnType( j ) );
                                    }
                                }
                                else {
                                    throw e;
                                }
                            }
                        }

                        stmt.execute();

                        this.currentBatchSize++;
                        this.currentRowNumber++;

                        if ( this.currentBatchSize == this.config.getBatchSize() ) {
                            /*
                             * We've reached our non-committed limit. Send the
                             * requests to the destination server.
                             */

                            this.adaptor.getConn().commit();

                            this.printThroughput();
                            this.currentBatchSize = 0;
                        }
                    }
                    else if ( o instanceof QueueFinished ) {
                        if ( this.currentBatchSize > 0 ) {
                            /* Commit outstanding rows from previous table */

                            this.adaptor.getConn().commit();

                            this.printThroughput();

                            this.log();
                            this.log( "completed" );
                        }

                        /* Exit while loop */
                        break;
                    }
                    else {
                        throw new RuntimeException( "Unexpected obeject in queue: " + o.getClass() );
                    }
                }
                catch ( InterruptedException e ) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                catch ( SQLException e ) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            try {
                this.adaptor.close();
            }
            catch ( SQLException e ) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    public static final class QueueFinished {
        /*
         * This only exists as a completely type-safe value in "instanceof"
         * expressions
         */
    }
}

OTHER TIPS

In a program I wrote a while back for work I had two simultaneous connections. Without giving the code away you'll want

public void initialize() {

    String dbUrl, dbUrl2, dbClass, dbClass2, user, user2, password, password2;
    Connection con, con2;
    Statement stmt, stmt2;
    ResultSet rs, rs2;

    try {
        Class.forName(dbClass);
        con = DriverManager.getConnection(dbUrl,user,password);
        con2 = DriverManager.getConnection(dbUrl2,user2,password2);
        stmt = con.createStatement();
    } catch(ClassNotFoundException e) {
        e.printStackTrace();
    }
    catch(SQLException e) {
        e.printStackTrace();
    }
}

Then once you have your two connections running,

rs = stmt.executeQuery("query");
rs2 = stmt2.executeQuery("second query");

I don't know how to specifically address your problem, but this code may be a little taxing on your system (assuming you don't have a high-end personal/company machine) and could take a while. That should give you enough to get started at least, I'd post more if I could, sadly it's slightly too complicated to mock up a version. Good luck though!

I think you'll be best served by having two separate connections, a read connection and a write connection, and pass the data through your Java application using a small buffer of sorts.

Another more complicated but possibly more elegant solution is to use a FEDERATED table. It makes a table on a remote server appear to be local. The queries are passed to the remote server and results are sent back. You have to be careful about indexes or it'll be really slow, but it may work for what you want to do.

http://dev.mysql.com/doc/refman/5.5/en/federated-description.html

I have done this before, and I recommend that you do what I did, which is get the data you need from DB A and write it out into 1 or more files as SQL 'set' statements. When I did this I had to split into about 10 files, because of restrictions on the size of the file being loaded in DB B

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top