문제

I have a realtime application that processes information and log's it to a MySQL database (actually MariaDB, a fork of MySQL). It does anywhere around 1.5 million inserts a day + 150,000 deletes.

I am having great problems with performance and don't know how to make it function any better.

The basic structure of the application is that I have a producer class, that pushes a Struct to a threadsafe deque. The following code

#include "dbUserQueue.h"


dbUserQueue::~dbUserQueue() {
}

void dbUserQueue::createConnection()
{
    sql::Driver * driver = sql::mysql::get_driver_instance();
    std::auto_ptr< sql::Connection > newCon(driver->connect(dbURL, dbUser, dbPass));
    con = newCon;
    std::auto_ptr< sql::Statement > stmt(con->createStatement());
    stmt->execute("USE twitter");
}

inline void dbUserQueue::updateStatement(const std::string & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != "\0") stmt->setString(index, value);
    else stmt->setNull(index,sql::DataType::VARCHAR);
}

inline void dbUserQueue::updateStatement(const boost::int64_t & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != -1) stmt->setInt64(index,value);
    else stmt->setNull(index,sql::DataType::BIGINT);
}

inline void dbUserQueue::updateStatement(const bool value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    stmt->setBoolean(index, value);
}

inline void dbUserQueue::updateStatement(const int value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != -1) stmt->setInt(index,value);
    else stmt->setNull(index,sql::DataType::INTEGER);
}

inline void dbUserQueue::updateStatementDateTime(const std::string & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int & index)
{
    if(value != "\0") stmt->setDateTime(index, value);
    else stmt->setNull(index,sql::DataType::DATE);
}

/*
 * This method creates a database connection 
 * and then creates a new thread to process the incoming queue
 */
void dbUserQueue::start() {
    createConnection();
    if(con->isClosed() == false)
    {
        insertStmt = std::auto_ptr< sql::PreparedStatement>(con->prepareStatement("\
insert ignore into users(contributors_enabled, created_at, \
description, favourites_count, followers_count, \
following, friends_count, geo_enabled, id, lang, listed_count, location, \
name, notifications, screen_name, show_all_inline_media, statuses_count, \
url, utc_offset, verified) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
    }
    thread = boost::thread(&dbUserQueue::processLoop, this);
}

/*
 * Stops the thread once it is finished processing the information
 */
void dbUserQueue::join(){
    thread.interrupt();
    thread.join();
}

/*
 * The worker function of the thread.  
 * Pops items from the queue and updates the database accordingly.
 */
void dbUserQueue::processLoop() {
    user input;
    int recordCount = 0;
    con->setAutoCommit(false);
    while (true) {
        try {
            if(recordCount >= 1000)
            {
                recordCount = 0;
                con->commit();
            }
            // Insert all the data into the prepared statement
            if (userQ.wait_and_pop(input)) {
                updateStatement(input.contributors_enabled, insertStmt, 1);
                updateStatementDateTime(input.created_at, insertStmt, 2);
                updateStatement(input.description, insertStmt, 3);
                updateStatement(input.favourites_count, insertStmt, 4);
                updateStatement(input.followers_count, insertStmt, 5);
                updateStatement(input.following, insertStmt, 6);
                updateStatement(input.friends_count, insertStmt, 7);
                updateStatement(input.geo_enabled, insertStmt, 8);
                updateStatement(input.id, insertStmt, 9);
                updateStatement(input.lang, insertStmt, 10);
                updateStatement(input.listed_count, insertStmt, 11);
                updateStatement(input.location, insertStmt, 12);
                updateStatement(input.name, insertStmt, 13);
                updateStatement(input.notifications, insertStmt, 14);
                updateStatement(input.screenName, insertStmt, 15);
                updateStatement(input.show_all_inline_media, insertStmt, 16);
                updateStatement(input.statuses_count, insertStmt, 17);
                updateStatement(input.url, insertStmt, 18);
                updateStatement(input.utc_offset, insertStmt, 19);
                updateStatement(input.verified, insertStmt, 20);

                insertStmt->executeUpdate();
                insertStmt->clearParameters();
                recordCount++;
                continue;
            }

        } catch (std::exception & e) {
        }
    }// end of while

    // Close the statements and the connection before exiting
    insertStmt->close();
    con->commit();
    if(con->isClosed() == false)
        con->close();
}

My questions is on how to improve the performance? Things I have tried:
Having multiple consumers connecting to one MySQL/MariaDB
Committing after a large number of records

Single Producer, Single consumer, commit after 1000 records = ~275 Seconds
Dual Producer, Triple consumers, commit after 1000 records = ~100 Seconds
Dual Producer, Triple consumers, commit after 2000 records = ~100 Seconds
Dual Producer, Triple consumers, commit every 1 record = ~100 Seconds
Dual Producer, 6 Consumers, commit every 1 record = ~95 Seconds
Dual Producer, 6 Consumers, commit every 2000 records = ~100 Seconds
Triple Producer, 6 Consumesr, commit every 2000 records = ~100 Seconds

A couple notes on the problem domain. The messages to insert and or delete come randomly throughout the day with an average of ~20 inserts/deletes per second, bursts much higher but there is no reason that the updates can't be queued for a short period, as long as the queue doesn't grow to large.

The table that the data is currently being inserted into has approximately 52 million records in it. Here is the MySQL table information

CREATE TABLE `users` (
  `id` bigint(20) unsigned NOT NULL,
  `contributors_enabled` tinyint(4) DEFAULT '0',
  `created_at` datetime NOT NULL,
  `description` varchar(255) DEFAULT NULL,
  `favourites_count` int(11) NOT NULL,
  `followers_count` int(11) DEFAULT NULL,
  `following` varchar(255) DEFAULT NULL,
  `friends_count` int(11) NOT NULL,
  `geo_enabled` tinyint(4) DEFAULT '0',
  `lang` varchar(255) DEFAULT NULL,
  `listed_count` int(11) DEFAULT NULL,
  `location` varchar(255) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `notifications` varchar(45) DEFAULT NULL,
  `screen_name` varchar(45) NOT NULL,
  `show_all_inline_media` tinyint(4) DEFAULT NULL,
  `statuses_count` int(11) NOT NULL,
  `url` varchar(255) DEFAULT NULL,
  `utc_offset` int(11) DEFAULT NULL,
  `verified` tinyint(4) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=MARIA DEFAULT CHARSET=latin1 CHECKSUM=1 PAGE_CHECKSUM=1 TRANSACTIONAL=1 
도움이 되었습니까?

해결책

You could change the code to do bulk inserts, rather than insert one row at a time.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top