我有一个实时应用程序,可以处理信息并将其记录到MySQL数据库(实际上是Mariadb,MySQL的叉子)。它每天插入约150万件 + 150,000个删除。

我在性能方面遇到了很大的问题,不知道如何使其发挥更好的功能。

应用程序的基本结构是我有一个生产者类,可以将结构推到线程安全deque。以下代码

#include "dbUserQueue.h"


dbUserQueue::~dbUserQueue() {
}

void dbUserQueue::createConnection()
{
    sql::Driver * driver = sql::mysql::get_driver_instance();
    std::auto_ptr< sql::Connection > newCon(driver->connect(dbURL, dbUser, dbPass));
    con = newCon;
    std::auto_ptr< sql::Statement > stmt(con->createStatement());
    stmt->execute("USE twitter");
}

inline void dbUserQueue::updateStatement(const std::string & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != "\0") stmt->setString(index, value);
    else stmt->setNull(index,sql::DataType::VARCHAR);
}

inline void dbUserQueue::updateStatement(const boost::int64_t & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != -1) stmt->setInt64(index,value);
    else stmt->setNull(index,sql::DataType::BIGINT);
}

inline void dbUserQueue::updateStatement(const bool value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    stmt->setBoolean(index, value);
}

inline void dbUserQueue::updateStatement(const int value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != -1) stmt->setInt(index,value);
    else stmt->setNull(index,sql::DataType::INTEGER);
}

inline void dbUserQueue::updateStatementDateTime(const std::string & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int & index)
{
    if(value != "\0") stmt->setDateTime(index, value);
    else stmt->setNull(index,sql::DataType::DATE);
}

/*
 * This method creates a database connection 
 * and then creates a new thread to process the incoming queue
 */
void dbUserQueue::start() {
    createConnection();
    if(con->isClosed() == false)
    {
        insertStmt = std::auto_ptr< sql::PreparedStatement>(con->prepareStatement("\
insert ignore into users(contributors_enabled, created_at, \
description, favourites_count, followers_count, \
following, friends_count, geo_enabled, id, lang, listed_count, location, \
name, notifications, screen_name, show_all_inline_media, statuses_count, \
url, utc_offset, verified) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
    }
    thread = boost::thread(&dbUserQueue::processLoop, this);
}

/*
 * Stops the thread once it is finished processing the information
 */
void dbUserQueue::join(){
    thread.interrupt();
    thread.join();
}

/*
 * The worker function of the thread.  
 * Pops items from the queue and updates the database accordingly.
 */
void dbUserQueue::processLoop() {
    user input;
    int recordCount = 0;
    con->setAutoCommit(false);
    while (true) {
        try {
            if(recordCount >= 1000)
            {
                recordCount = 0;
                con->commit();
            }
            // Insert all the data into the prepared statement
            if (userQ.wait_and_pop(input)) {
                updateStatement(input.contributors_enabled, insertStmt, 1);
                updateStatementDateTime(input.created_at, insertStmt, 2);
                updateStatement(input.description, insertStmt, 3);
                updateStatement(input.favourites_count, insertStmt, 4);
                updateStatement(input.followers_count, insertStmt, 5);
                updateStatement(input.following, insertStmt, 6);
                updateStatement(input.friends_count, insertStmt, 7);
                updateStatement(input.geo_enabled, insertStmt, 8);
                updateStatement(input.id, insertStmt, 9);
                updateStatement(input.lang, insertStmt, 10);
                updateStatement(input.listed_count, insertStmt, 11);
                updateStatement(input.location, insertStmt, 12);
                updateStatement(input.name, insertStmt, 13);
                updateStatement(input.notifications, insertStmt, 14);
                updateStatement(input.screenName, insertStmt, 15);
                updateStatement(input.show_all_inline_media, insertStmt, 16);
                updateStatement(input.statuses_count, insertStmt, 17);
                updateStatement(input.url, insertStmt, 18);
                updateStatement(input.utc_offset, insertStmt, 19);
                updateStatement(input.verified, insertStmt, 20);

                insertStmt->executeUpdate();
                insertStmt->clearParameters();
                recordCount++;
                continue;
            }

        } catch (std::exception & e) {
        }
    }// end of while

    // Close the statements and the connection before exiting
    insertStmt->close();
    con->commit();
    if(con->isClosed() == false)
        con->close();
}

我的问题是如何提高性能?我尝试过的事情:
有多个消费者连接到一个MySQL/Mariadb
在大量记录之后提交

Single Producer, Single consumer, commit after 1000 records = ~275 Seconds
Dual Producer, Triple consumers, commit after 1000 records = ~100 Seconds
Dual Producer, Triple consumers, commit after 2000 records = ~100 Seconds
Dual Producer, Triple consumers, commit every 1 record = ~100 Seconds
Dual Producer, 6 Consumers, commit every 1 record = ~95 Seconds
Dual Producer, 6 Consumers, commit every 2000 records = ~100 Seconds
Triple Producer, 6 Consumesr, commit every 2000 records = ~100 Seconds

几个关于问题域的注释。要插入和或删除的消息全天随机出现,平均每秒约20个插入/删除,爆发要高得多,但没有理由将更新不能在短时间内排队,只要队列不大。

目前正在插入数据的表中有大约5200万个记录。这是MySQL表信息

CREATE TABLE `users` (
  `id` bigint(20) unsigned NOT NULL,
  `contributors_enabled` tinyint(4) DEFAULT '0',
  `created_at` datetime NOT NULL,
  `description` varchar(255) DEFAULT NULL,
  `favourites_count` int(11) NOT NULL,
  `followers_count` int(11) DEFAULT NULL,
  `following` varchar(255) DEFAULT NULL,
  `friends_count` int(11) NOT NULL,
  `geo_enabled` tinyint(4) DEFAULT '0',
  `lang` varchar(255) DEFAULT NULL,
  `listed_count` int(11) DEFAULT NULL,
  `location` varchar(255) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `notifications` varchar(45) DEFAULT NULL,
  `screen_name` varchar(45) NOT NULL,
  `show_all_inline_media` tinyint(4) DEFAULT NULL,
  `statuses_count` int(11) NOT NULL,
  `url` varchar(255) DEFAULT NULL,
  `utc_offset` int(11) DEFAULT NULL,
  `verified` tinyint(4) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=MARIA DEFAULT CHARSET=latin1 CHECKSUM=1 PAGE_CHECKSUM=1 TRANSACTIONAL=1 
有帮助吗?

解决方案

您可以更改代码进行批量插入,而不是一次插入一行。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top