Frage

Ich habe eine Echtzeitanwendung, die Informationen verarbeitet und sie in einer MySQL -Datenbank protokolliert (eigentlich Mariadb, eine Gabel von MySQL). Es macht irgendwo etwa 1,5 Millionen Einsätze pro Tag + 150.000 Deletten.

Ich habe große Probleme mit der Leistung und weiß nicht, wie es besser funktioniert.

Die Grundstruktur der Anwendung ist, dass ich eine Produzentenklasse habe, die eine Struktur zu einem Threadsafe -Deque drückt. Der folgende Code

#include "dbUserQueue.h"


dbUserQueue::~dbUserQueue() {
}

void dbUserQueue::createConnection()
{
    sql::Driver * driver = sql::mysql::get_driver_instance();
    std::auto_ptr< sql::Connection > newCon(driver->connect(dbURL, dbUser, dbPass));
    con = newCon;
    std::auto_ptr< sql::Statement > stmt(con->createStatement());
    stmt->execute("USE twitter");
}

inline void dbUserQueue::updateStatement(const std::string & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != "\0") stmt->setString(index, value);
    else stmt->setNull(index,sql::DataType::VARCHAR);
}

inline void dbUserQueue::updateStatement(const boost::int64_t & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != -1) stmt->setInt64(index,value);
    else stmt->setNull(index,sql::DataType::BIGINT);
}

inline void dbUserQueue::updateStatement(const bool value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    stmt->setBoolean(index, value);
}

inline void dbUserQueue::updateStatement(const int value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
    if(value != -1) stmt->setInt(index,value);
    else stmt->setNull(index,sql::DataType::INTEGER);
}

inline void dbUserQueue::updateStatementDateTime(const std::string & value, 
                   std::auto_ptr< sql::PreparedStatement> & stmt, const int & index)
{
    if(value != "\0") stmt->setDateTime(index, value);
    else stmt->setNull(index,sql::DataType::DATE);
}

/*
 * This method creates a database connection 
 * and then creates a new thread to process the incoming queue
 */
void dbUserQueue::start() {
    createConnection();
    if(con->isClosed() == false)
    {
        insertStmt = std::auto_ptr< sql::PreparedStatement>(con->prepareStatement("\
insert ignore into users(contributors_enabled, created_at, \
description, favourites_count, followers_count, \
following, friends_count, geo_enabled, id, lang, listed_count, location, \
name, notifications, screen_name, show_all_inline_media, statuses_count, \
url, utc_offset, verified) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
    }
    thread = boost::thread(&dbUserQueue::processLoop, this);
}

/*
 * Stops the thread once it is finished processing the information
 */
void dbUserQueue::join(){
    thread.interrupt();
    thread.join();
}

/*
 * The worker function of the thread.  
 * Pops items from the queue and updates the database accordingly.
 */
void dbUserQueue::processLoop() {
    user input;
    int recordCount = 0;
    con->setAutoCommit(false);
    while (true) {
        try {
            if(recordCount >= 1000)
            {
                recordCount = 0;
                con->commit();
            }
            // Insert all the data into the prepared statement
            if (userQ.wait_and_pop(input)) {
                updateStatement(input.contributors_enabled, insertStmt, 1);
                updateStatementDateTime(input.created_at, insertStmt, 2);
                updateStatement(input.description, insertStmt, 3);
                updateStatement(input.favourites_count, insertStmt, 4);
                updateStatement(input.followers_count, insertStmt, 5);
                updateStatement(input.following, insertStmt, 6);
                updateStatement(input.friends_count, insertStmt, 7);
                updateStatement(input.geo_enabled, insertStmt, 8);
                updateStatement(input.id, insertStmt, 9);
                updateStatement(input.lang, insertStmt, 10);
                updateStatement(input.listed_count, insertStmt, 11);
                updateStatement(input.location, insertStmt, 12);
                updateStatement(input.name, insertStmt, 13);
                updateStatement(input.notifications, insertStmt, 14);
                updateStatement(input.screenName, insertStmt, 15);
                updateStatement(input.show_all_inline_media, insertStmt, 16);
                updateStatement(input.statuses_count, insertStmt, 17);
                updateStatement(input.url, insertStmt, 18);
                updateStatement(input.utc_offset, insertStmt, 19);
                updateStatement(input.verified, insertStmt, 20);

                insertStmt->executeUpdate();
                insertStmt->clearParameters();
                recordCount++;
                continue;
            }

        } catch (std::exception & e) {
        }
    }// end of while

    // Close the statements and the connection before exiting
    insertStmt->close();
    con->commit();
    if(con->isClosed() == false)
        con->close();
}

Meine Fragen sind, wie Sie die Leistung verbessern können? Dinge, die ich versucht habe:
Wenn mehrere Verbraucher mit einem MySQL/Mariadb verbunden sind
Nach einer großen Anzahl von Aufzeichnungen begehen

Single Producer, Single consumer, commit after 1000 records = ~275 Seconds
Dual Producer, Triple consumers, commit after 1000 records = ~100 Seconds
Dual Producer, Triple consumers, commit after 2000 records = ~100 Seconds
Dual Producer, Triple consumers, commit every 1 record = ~100 Seconds
Dual Producer, 6 Consumers, commit every 1 record = ~95 Seconds
Dual Producer, 6 Consumers, commit every 2000 records = ~100 Seconds
Triple Producer, 6 Consumesr, commit every 2000 records = ~100 Seconds

Ein paar Hinweise zur Problemdomäne. Die Nachrichten zum Einfügen und oder löschen werden im Laufe des Tages mit einem Durchschnitt von ~ 20 Einfügen/Löschungen pro Sekunde zufällig gekommen. Es gibt viel höher, aber es gibt keinen Grund, warum die Updates für kurze Zeit nicht in die Warteschlange gestellt werden können, solange die Warteschlange in der Warteschlange stellt Wächst nicht zu groß.

Die Tabelle, in die die Daten derzeit eingefügt werden, enthält ungefähr 52 Millionen Datensätze. Hier ist die MySQL -Tabelleninformationen

CREATE TABLE `users` (
  `id` bigint(20) unsigned NOT NULL,
  `contributors_enabled` tinyint(4) DEFAULT '0',
  `created_at` datetime NOT NULL,
  `description` varchar(255) DEFAULT NULL,
  `favourites_count` int(11) NOT NULL,
  `followers_count` int(11) DEFAULT NULL,
  `following` varchar(255) DEFAULT NULL,
  `friends_count` int(11) NOT NULL,
  `geo_enabled` tinyint(4) DEFAULT '0',
  `lang` varchar(255) DEFAULT NULL,
  `listed_count` int(11) DEFAULT NULL,
  `location` varchar(255) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `notifications` varchar(45) DEFAULT NULL,
  `screen_name` varchar(45) NOT NULL,
  `show_all_inline_media` tinyint(4) DEFAULT NULL,
  `statuses_count` int(11) NOT NULL,
  `url` varchar(255) DEFAULT NULL,
  `utc_offset` int(11) DEFAULT NULL,
  `verified` tinyint(4) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=MARIA DEFAULT CHARSET=latin1 CHECKSUM=1 PAGE_CHECKSUM=1 TRANSACTIONAL=1 
War es hilfreich?

Lösung

Sie können den Code so ändern, dass er lassende Einfügungen durchführt, anstatt jeweils eine Zeile einzufügen.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top