提交 9d6f980b 编写于 作者: Z zhihaop

feat: improve the multithread insert performance of dispatcher

上级 8c901e1c
...@@ -144,22 +144,19 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { ...@@ -144,22 +144,19 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
} }
/** /**
* Poll all the SSqlObj* in the dispatcher's buffer. * Poll all the SSqlObj* in the dispatcher's buffer (No Lock). After call this function,
* you need to notify dispatcher->notFull by yourself.
* *
* @param dispatcher the dispatcher. * @param dispatcher the dispatcher.
* @return the items in the dispatcher, SArray<SSqlObj*>. * @return the items in the dispatcher, SArray<SSqlObj*>.
*/ */
inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
pthread_mutex_lock(&dispatcher->mutex);
if (!atomic_load_32(&dispatcher->bufferSize)) { if (!atomic_load_32(&dispatcher->bufferSize)) {
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->mutex);
return NULL; return NULL;
} }
SArray* statements = taosArrayDup(dispatcher->buffer); SArray* statements = taosArrayDup(dispatcher->buffer);
if (statements == NULL) { if (statements == NULL) {
pthread_mutex_unlock(&dispatcher->mutex);
tscError("failed to poll all items: out of memory"); tscError("failed to poll all items: out of memory");
return NULL; return NULL;
} }
...@@ -167,9 +164,20 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { ...@@ -167,9 +164,20 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
atomic_store_32(&dispatcher->bufferSize, 0); atomic_store_32(&dispatcher->bufferSize, 0);
atomic_store_32(&dispatcher->currentSize, 0); atomic_store_32(&dispatcher->currentSize, 0);
taosArrayClear(dispatcher->buffer); taosArrayClear(dispatcher->buffer);
return statements;
}
/**
* Poll all the SSqlObj* in the dispatcher's buffer.
*
* @param dispatcher the dispatcher.
* @return the items in the dispatcher, SArray<SSqlObj*>.
*/
inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
pthread_mutex_lock(&dispatcher->mutex);
SArray* statements = dispatcherPollAll(dispatcher);
pthread_cond_broadcast(&dispatcher->notFull); pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->mutex);
return statements; return statements;
} }
...@@ -178,9 +186,9 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { ...@@ -178,9 +186,9 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
* *
* @param dispatcher the async bulk write dispatcher. * @param dispatcher the async bulk write dispatcher.
* @param pSql the sql object to offer. * @param pSql the sql object to offer.
* @return if offer success, return the current size of the buffer. otherwise returns -1. * @return return whether offer success.
*/ */
inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
pthread_mutex_lock(&dispatcher->mutex); pthread_mutex_lock(&dispatcher->mutex);
// if dispatcher is shutdown, must fail back to normal insertion. // if dispatcher is shutdown, must fail back to normal insertion.
...@@ -204,6 +212,14 @@ inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, ...@@ -204,6 +212,14 @@ inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher,
atomic_fetch_add_32(&dispatcher->bufferSize, 1); atomic_fetch_add_32(&dispatcher->bufferSize, 1);
int32_t numOfRows = statementGetInsertionRows(pSql); int32_t numOfRows = statementGetInsertionRows(pSql);
int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows); int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows);
// the dispatcher has been shutdown or reach batch size.
if (atomic_load_8(&dispatcher->shutdown) || currentSize >= dispatcher->batchSize) {
SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements);
taosArrayDestroy(&statements);
pthread_cond_broadcast(&dispatcher->notFull);
}
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->mutex);
return currentSize; return currentSize;
} }
...@@ -305,7 +321,7 @@ static void* dispatcherTimeoutCallback(void* arg) { ...@@ -305,7 +321,7 @@ static void* dispatcherTimeoutCallback(void* arg) {
struct timespec timeout = afterMillis(current, dispatcher->timeoutMs); struct timespec timeout = afterMillis(current, dispatcher->timeoutMs);
atomic_store_8(&dispatcher->exclusive, true); atomic_store_8(&dispatcher->exclusive, true);
SArray* statements = dispatcherPollAll(dispatcher); SArray* statements = dispatcherLockPollAll(dispatcher);
atomic_store_8(&dispatcher->exclusive, false); atomic_store_8(&dispatcher->exclusive, false);
dispatcherExecute(statements); dispatcherExecute(statements);
taosArrayDestroy(&statements); taosArrayDestroy(&statements);
...@@ -381,7 +397,7 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { ...@@ -381,7 +397,7 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
// poll and send all the statements in the buffer. // poll and send all the statements in the buffer.
while (atomic_load_32(&dispatcher->bufferSize)) { while (atomic_load_32(&dispatcher->bufferSize)) {
SArray* statements = dispatcherPollAll(dispatcher); SArray* statements = dispatcherLockPollAll(dispatcher);
dispatcherExecute(statements); dispatcherExecute(statements);
taosArrayDestroy(&statements); taosArrayDestroy(&statements);
} }
...@@ -446,17 +462,7 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) ...@@ -446,17 +462,7 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql)
} }
// try to offer pSql to the buffer. // try to offer pSql to the buffer.
int32_t currentSize = dispatcherTryOffer(dispatcher, pSql); return dispatcherTryOffer(dispatcher, pSql);
if (currentSize < 0) {
return false;
}
if (atomic_load_8(&dispatcher->shutdown) || currentSize >= dispatcher->batchSize) {
SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements);
taosArrayDestroy(&statements);
}
return true;
} }
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册