提交 5fcff75c 编写于 作者: Z zhihaop

feat: improve the performance of async bulk write dispatcher

上级 08f7a4fa
......@@ -31,11 +31,14 @@ typedef struct SAsyncBulkWriteDispatcher {
// the buffer to store the insertion statements. equivalent to SArray<SSqlObj*>.
SArray* buffer;
// the mutex to protect the buffer.
// the mutex to protect the dispatcher.
pthread_mutex_t mutex;
// the cond to signal to background thread.
pthread_cond_t cond;
pthread_cond_t timeout;
// the cond to signal to background thread.
pthread_cond_t notFull;
// the background thread to manage batching timeout.
pthread_t background;
......@@ -73,24 +76,7 @@ typedef struct SSqlObj SSqlObj;
int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result);
/**
* Poll all the SSqlObj* in the dispatcher's buffer.
*
* @param dispatcher the dispatcher.
* @return the items in the dispatcher, SArray<SSqlObj*>.
*/
SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher);
/**
* @brief Try to offer the SSqlObj* to the dispatcher.
*
* @param dispatcher the async bulk write dispatcher.
* @param pSql the sql object to offer.
* @return if offer success, return the current size of the buffer. otherwise returns -1.
*/
int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* @brief Merge the sql statements and execute the merged sql statement.
* Merge the sql statements and execute the merged sql statement.
*
* @param statements the array of sql statement. a.k.a SArray<SSqlObj*>.
*/
......@@ -118,10 +104,11 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher);
* 2. must be an `insert into ... value ...` statement.
* 3. the payload type must be kv payload.
*
* @param dispatcher the async dispatcher.
* @param pSql the sql object to check.
* @return returns true if the sql object supports auto batch.
*/
bool tscSupportBulkInsertion(SSqlObj* pSql);
bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* Try to offer the SSqlObj* to the buffer. If the number of row reach `asyncBatchSize`, the function
......@@ -174,6 +161,7 @@ void destroyDispatcherHolder(SDispatcherHolder* holder);
/**
* Get an instance of SAsyncBulkWriteDispatcher.
*
* @param holder the holder of SAsyncBulkWriteDispatcher.
* @return the SAsyncBulkWriteDispatcher instance.
*/
......
......@@ -143,12 +143,20 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
return code;
}
SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
/**
* Poll all the SSqlObj* in the dispatcher's buffer.
*
* @param dispatcher the dispatcher.
* @return the items in the dispatcher, SArray<SSqlObj*>.
*/
inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
pthread_mutex_lock(&dispatcher->mutex);
if (!atomic_load_32(&dispatcher->bufferSize)) {
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->mutex);
return NULL;
}
pthread_mutex_lock(&dispatcher->mutex);
SArray* statements = taosArrayDup(dispatcher->buffer);
if (statements == NULL) {
pthread_mutex_unlock(&dispatcher->mutex);
......@@ -159,27 +167,38 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
atomic_store_32(&dispatcher->bufferSize, 0);
atomic_store_32(&dispatcher->currentSize, 0);
taosArrayClear(dispatcher->buffer);
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->mutex);
return statements;
}
int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
// pre-check: the buffer is full.
if (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) {
/**
* @brief Try to offer the SSqlObj* to the dispatcher.
*
* @param dispatcher the async bulk write dispatcher.
* @param pSql the sql object to offer.
* @return if offer success, return the current size of the buffer. otherwise returns -1.
*/
inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
pthread_mutex_lock(&dispatcher->mutex);
// if dispatcher is shutdown, must fail back to normal insertion.
// usually not happen, unless taos_query_a(...) after taos_close(...).
if (atomic_load_8(&dispatcher->shutdown)) {
pthread_mutex_unlock(&dispatcher->mutex);
return -1;
}
pthread_mutex_lock(&dispatcher->mutex);
// double-check: the buffer is full.
if (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) {
// the buffer is full.
while (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) {
if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->mutex)) {
pthread_mutex_unlock(&dispatcher->mutex);
return -1;
}
}
taosArrayPush(dispatcher->buffer, pSql);
tscDebug("sql obj %p has been write to insert buffer", pSql);
atomic_fetch_add_32(&dispatcher->bufferSize, 1);
......@@ -205,9 +224,7 @@ void dispatcherExecute(SArray* statements) {
tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
tscHandleMultivnodeInsert(merged);
taosArrayDestroy(&statements);
return;
_error:
tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
......@@ -216,7 +233,6 @@ _error:
SSqlObj* item = *((SSqlObj**)taosArrayGet(statements, i));
tscReturnsError(item, code);
}
taosArrayDestroy(&statements);
}
/**
......@@ -245,43 +261,58 @@ static inline int64_t durationMillis(struct timespec s, struct timespec t) {
return d;
}
/**
* Sleep until `timeout` timespec. When dispatcherShutdown(...) called, the function will return immediately.
*
* @param dispatcher the dispatcher thread to sleep.
* @param timeout the timeout in CLOCK_REALTIME.
*/
inline static void dispatcherSleepUntil(SAsyncBulkWriteDispatcher* dispatcher, struct timespec timeout) {
struct timespec current;
clock_gettime(CLOCK_REALTIME, &current);
// if current > timeout, no sleep required.
if (durationMillis(current, timeout) <= 0) {
return;
}
if (pthread_mutex_timedlock(&dispatcher->mutex, &timeout)) {
return;
}
while (true) {
// notified by dispatcherShutdown(...).
if (atomic_load_8(&dispatcher->shutdown)) {
break;
}
if (pthread_cond_timedwait(&dispatcher->timeout, &dispatcher->mutex, &timeout)) {
break;
}
}
pthread_mutex_unlock(&dispatcher->mutex);
}
/**
* The thread to manage batching timeout.
*/
static void* dispatcherTimeoutCallback(void* arg) {
SAsyncBulkWriteDispatcher* dispatcher = arg;
setThreadName("tscBackground");
setThreadName("tscAsyncBackground");
while (!atomic_load_8(&dispatcher->shutdown)) {
struct timespec t1, t2;
clock_gettime(CLOCK_REALTIME, &t1);
struct timespec current;
clock_gettime(CLOCK_REALTIME, &current);
struct timespec timeout = afterMillis(current, dispatcher->timeoutMs);
atomic_store_8(&dispatcher->exclusive, true);
SArray* statements = dispatcherPollAll(dispatcher);
atomic_store_8(&dispatcher->exclusive, false);
dispatcherExecute(statements);
clock_gettime(CLOCK_REALTIME, &t2);
taosArrayDestroy(&statements);
// Similar to scheduleAtFixedRate in Java, if the execution time exceed
// `timeoutMs` milliseconds, then there will be no sleep.
struct timespec t3 = afterMillis(t1, dispatcher->timeoutMs);
if (durationMillis(t2, t3) > 0) {
if (pthread_mutex_timedlock(&dispatcher->mutex, &t3)) {
continue;
}
while (true) {
if (atomic_load_8(&dispatcher->shutdown)) {
break;
}
if (pthread_cond_timedwait(&dispatcher->cond, &dispatcher->mutex, &t3)) {
break;
}
}
pthread_mutex_unlock(&dispatcher->mutex);
}
dispatcherSleepUntil(dispatcher, timeout);
}
return NULL;
}
......@@ -309,10 +340,14 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int
// init the mutex and the cond.
pthread_mutex_init(&dispatcher->mutex, NULL);
pthread_cond_init(&dispatcher->cond, NULL);
pthread_cond_init(&dispatcher->timeout, NULL);
pthread_cond_init(&dispatcher->notFull, NULL);
// init background thread.
if (pthread_create(&dispatcher->background, NULL, dispatcherTimeoutCallback, dispatcher)) {
pthread_mutex_destroy(&dispatcher->mutex);
pthread_cond_destroy(&dispatcher->timeout);
pthread_cond_destroy(&dispatcher->notFull);
taosArrayDestroy(&dispatcher->buffer);
tfree(dispatcher);
return NULL;
......@@ -321,24 +356,34 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int
return dispatcher;
}
void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
if (dispatcher == NULL) {
return;
}
/**
* Shutdown the dispatcher and join the timeout thread.
*
* @param dispatcher the dispatcher.
*/
inline static void dispatcherShutdown(SAsyncBulkWriteDispatcher* dispatcher) {
// mark shutdown, signal shutdown to timeout thread.
pthread_mutex_lock(&dispatcher->mutex);
atomic_store_8(&dispatcher->shutdown, true);
pthread_cond_signal(&dispatcher->cond);
pthread_cond_broadcast(&dispatcher->timeout);
pthread_mutex_unlock(&dispatcher->mutex);
// make sure the timeout thread exit.
pthread_join(dispatcher->background, NULL);
}
void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
if (dispatcher == NULL) {
return;
}
dispatcherShutdown(dispatcher);
// poll and send all the statements in the buffer.
while (atomic_load_32(&dispatcher->bufferSize)) {
SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements);
taosArrayDestroy(&statements);
}
// destroy the buffer.
......@@ -346,10 +391,13 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
// destroy the mutex.
pthread_mutex_destroy(&dispatcher->mutex);
pthread_cond_destroy(&dispatcher->timeout);
pthread_cond_destroy(&dispatcher->notFull);
free(dispatcher);
}
bool tscSupportBulkInsertion(SSqlObj* pSql) {
bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
if (pSql == NULL || !pSql->enableBatch) {
return false;
}
......@@ -374,6 +422,11 @@ bool tscSupportBulkInsertion(SSqlObj* pSql) {
return false;
}
// too many insertion rows, fail back to normal insertion.
if (statementGetInsertionRows(pSql) >= dispatcher->batchSize) {
return false;
}
return true;
}
......@@ -383,7 +436,7 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql)
}
// the sql object doesn't support bulk insertion.
if (!tscSupportBulkInsertion(pSql)) {
if (!tscSupportBulkInsertion(dispatcher, pSql)) {
return false;
}
......@@ -398,10 +451,10 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql)
return false;
}
// the buffer is full or reach batch size.
if (currentSize >= dispatcher->batchSize) {
if (atomic_load_8(&dispatcher->shutdown) || currentSize >= dispatcher->batchSize) {
SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements);
taosArrayDestroy(&statements);
}
return true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册