提交 a3385091 编写于 作者: Z zhihaop

feat: improve the performance of async bulk write dispatcher

上级 e70b6c6d
...@@ -506,13 +506,13 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code); ...@@ -506,13 +506,13 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
/** /**
* The initialization of async insertion auto batch feature. * The initialization of async insertion auto batch feature.
* *
* @param batchLen When user submit an insert statement to `taos_query_ra`, the statement will be buffered * @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the execution queue instead of executing it. If the number of the buffered * asynchronously in the execution queue instead of executing it. If the number of the buffered
* statements reach batchLen, all the statements in the queue will be merged and sent to vnodes. * statements reach batchSize, all the statements in the queue will be merged and sent to vnodes.
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time * @param timeoutMs The statements will be sent to vnodes no more than timeoutMs milliseconds. But the actual time
* vnodes received the statements depends on the network quality. * vnodes received the statements depends on the network quality.
*/ */
void tscInitAsyncDispatcher(int32_t batchLen, int64_t timeout); void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs);
/** /**
* Destroy the async auto batch dispatcher. * Destroy the async auto batch dispatcher.
......
...@@ -13,19 +13,23 @@ ...@@ -13,19 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdatomic.h> #include <pthread.h>
#include <stdio.h>
#include "os.h" #include "os.h"
#include "osAtomic.h"
#include "tarray.h"
#include "tlist.h"
#include "tutil.h" #include "tutil.h"
#include "qTableMeta.h" #include "qTableMeta.h"
#include "tnote.h" #include "tnote.h"
#include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsched.h" #include "tsched.h"
#include "tsclient.h" #include "tsclient.h"
#include "tthread.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
...@@ -36,21 +40,36 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf ...@@ -36,21 +40,36 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf
*/ */
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
// The async auto batch feature. typedef struct SAsyncBulkWriteDispatcher {
static atomic_bool asyncBatchEnable; // the mpmc queue to store the insertion statements. equivalent to SList<SSqlObj*>.
// The queue store the async insertion statements SList* buffer;
static taos_queue insertionQueue;
// The number of statements in the insertion queue. // the mutex to protect the buffer.
static atomic_int currentBatchLen; pthread_mutex_t mutex;
// The maximum auto batch len.
static int asyncBatchLen; // the background thread to manage batching timeout.
// The batch timeout in milliseconds. pthread_t* background;
static int64_t asyncBatchTimeout;
// The state of the insertion queue. While executing timeout task, the queue will set exclusive for writing // the maximum number of insertion rows in a batch.
// in order to make sure the statements will be sent to vnodes no more than `timeout` milliseconds. int32_t batchSize;
static atomic_int exclusiveState;
// The background thread to manage statement auto batch timeout. // the batching timeout in milliseconds.
static pthread_t background; int32_t timeoutMs;
// the number of item in the buffer.
volatile int32_t bufferSize;
// the number of insertion rows in the buffer.
volatile int32_t currentSize;
// while executing timeout task, the buffer will set exclusive for writing.
volatile bool exclusive;
// whether the dispatcher is shutdown.
volatile bool shutdown;
} SAsyncBulkWriteDispatcher;
static SAsyncBulkWriteDispatcher *tscDispatcher;
/** /**
* Return the error result to the callback function, and release the sql object. * Return the error result to the callback function, and release the sql object.
...@@ -177,157 +196,232 @@ static int32_t tscMergeStatements(SArray* statements, SSqlObj** result) { ...@@ -177,157 +196,232 @@ static int32_t tscMergeStatements(SArray* statements, SSqlObj** result) {
return code; return code;
} }
/** /**
* Fetch all the statements in the insertion queue, clean the insertion queue, and sent the statements to the vnodes. * @brief Get the number of insertion row in the sql statement.
*
* @param pSql the sql statement.
* @return int32_t the number of insertion row.
*/ */
static void tscPollThenSendAsyncQueue() { inline static int32_t tscGetInsertionRows(SSqlObj* pSql) {
// get the number of the items in the queue. return pSql->cmd.insertParam.numOfRows;
int sizeOfQueue = taosGetQueueItemsNumber(insertionQueue); }
if (sizeOfQueue == 0) {
return; inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher * dispatcher) {
if (!atomic_load_32(&dispatcher->bufferSize)) {
return NULL;
} }
int32_t code = TSDB_CODE_SUCCESS; pthread_mutex_lock(&dispatcher->mutex);
SArray* statements = taosArrayInit(0, sizeof(SSqlObj *));
// out of memory. SArray* statements = taosArrayInit(atomic_load_32(&dispatcher->bufferSize), sizeof(SSqlObj *));
if (statements == NULL) { if (statements == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; pthread_mutex_unlock(&dispatcher->mutex);
goto cleanup; tscError("failed to poll all items: out of memory");
return NULL;
} }
// get the sql statements from the queue. // get all the sql statements from the buffer.
for (int i = 0; i < sizeOfQueue; ++i) { while(atomic_load_32(&dispatcher->bufferSize)) {
// get a queue node from the queue. SListNode* node = tdListPopHead(dispatcher->buffer);
int type; if (!node) {
void* node;
if (!taosReadQitem(insertionQueue, &type, &node)) {
break; break;
} }
// get the SSqlObj* from the queue node. // get the SSqlObj* from the node.
SSqlObj* item = *((SSqlObj **) node); SSqlObj* item;
taosFreeQitem(node); memcpy(&item, node->data, sizeof(SSqlObj*));
atomic_fetch_sub(&currentBatchLen, item->cmd.insertParam.numOfRows); listNodeFree(node);
atomic_fetch_sub_32(&dispatcher->bufferSize, 1);
// out of memory. atomic_fetch_sub_32(&dispatcher->currentSize, tscGetInsertionRows(item));
if (!taosArrayPush(statements, &item)) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; taosArrayPush(statements, &item);
tscReturnsError(item, code); }
goto cleanup;
} pthread_mutex_unlock(&dispatcher->mutex);
return statements;
}
/**
* @brief Try to offer the SSqlObj* to the dispatcher.
*
* @param dispatcher the async bulk write dispatcher.
* @param pSql the sql object to offer.
* @return int32_t if offer success, return the current size of the buffer. otherwise returns -1.
*/
inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher * dispatcher, SSqlObj* pSql) {
// the buffer is full.
if (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) {
return -1;
} }
// no item in the queue (items has been taken by other threads).
if (taosArrayGetSize(statements) == 0) { // offer the node to the buffer.
goto cleanup; pthread_mutex_lock(&dispatcher->mutex);
if (tdListAppend(dispatcher->buffer, &pSql)) {
pthread_mutex_unlock(&dispatcher->mutex);
return -1;
}
tscDebug("sql obj %p has been write to insert buffer", pSql);
atomic_fetch_add_32(&dispatcher->bufferSize, 1);
int32_t numOfRows = tscGetInsertionRows(pSql);
int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows);
pthread_mutex_unlock(&dispatcher->mutex);
return currentSize;
}
/**
* @brief Merge the sql statements and execute the merged sql statement.
*
* @param statements the array of sql statement. a.k.a SArray<SSqlObj*>.
*/
static void tscMergeExecute(SArray* statements) {
int32_t code = TSDB_CODE_SUCCESS;
// no item in the buffer (items has been taken by other threads).
if (!statements || !taosArrayGetSize(statements)) {
return;
} }
// merge the statements into single one. // merge the statements into single one.
SSqlObj* merged = NULL; SSqlObj* merged = NULL;
code = tscMergeStatements(statements, &merged); code = tscMergeStatements(statements, &merged);
if (code == TSDB_CODE_SUCCESS) {
tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
tscHandleMultivnodeInsert(merged);
taosArrayDestroy(&statements);
return;
}
cleanup:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("send async batch sql obj failed, reason: %s", tstrerror(code)); goto _error;
} }
tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
tscHandleMultivnodeInsert(merged);
taosArrayDestroy(&statements);
return;
_error:
tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
// handling the failures. // handling the failures.
if (statements) { for (int i = 0; i < taosArrayGetSize(statements); ++i) {
for (int i = 0; i < taosArrayGetSize(statements); ++i) { SSqlObj* item = *((SSqlObj **)taosArrayGet(statements, i));
SSqlObj* item = *((SSqlObj **)taosArrayGet(statements, i)); tscReturnsError(item, code);
tscReturnsError(item, code);
}
taosArrayDestroy(&statements);
} }
taosArrayDestroy(&statements);
} }
/** /**
* The background thread to manage statement batch timeout. * The thread to manage batching timeout.
*/ */
static void* tscAsyncBackGroundThread(void* args) { static void* dispatcherTimeoutCallback(void* arg) {
const int64_t timeoutUs = asyncBatchTimeout * 1000L; SAsyncBulkWriteDispatcher *dispatcher = arg;
setThreadName("tscBackground"); setThreadName("tscBackground");
while (atomic_load(&asyncBatchEnable)) { while (!atomic_load_8(&dispatcher->shutdown)) {
// set the exclusive state.
atomic_fetch_or(&exclusiveState, 0x1);
int64_t t0 = taosGetTimestampNs(); int64_t t0 = taosGetTimestampNs();
tscPollThenSendAsyncQueue();
int64_t t1 = taosGetTimestampNs();
// unset the exclusive state. atomic_store_8(&dispatcher->exclusive, true);
atomic_fetch_and(&exclusiveState, ~0x1); SArray* statements = dispatcherPollAll(dispatcher);
atomic_store_8(&dispatcher->exclusive, false);
tscMergeExecute(statements);
int64_t durationUs = (t1 - t0) / 1000L; int64_t t1 = taosGetTimestampNs();
// Similar to scheduleAtFixedRate in Java, if the execution time of `tscPollThenSendAsyncQueue` exceed int64_t durationMs = (t1 - t0) / 1000000;
// `asyncBatchTimeout` milliseconds, then there will be no sleep.
if (durationUs < timeoutUs) { // Similar to scheduleAtFixedRate in Java, if the execution time exceed
usleep(timeoutUs - durationUs); // `timeoutMs` milliseconds, then there will be no sleep.
if (durationMs < dispatcher->timeoutMs) {
taosMsleep((int32_t) (dispatcher->timeoutMs - durationMs));
} }
} }
return args; return NULL;
} }
/** /**
* The initialization of async insertion auto batch feature. * Create the async bulk write dispatcher.
* *
* @param batchLen When user submit an insert statement to `taos_query_ra`, the statement will be buffered * @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the execution queue instead of executing it. If the number of the buffered * asynchronously in the buffer instead of executing it. If the number of the buffered
* statements reach batchLen, all the statements in the queue will be merged and sent to vnodes. * statements reach batchLen, all the statements in the buffer will be merged and sent to vnodes.
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time * @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
* vnodes received the statements depends on the network quality. * vnodes received the statements depends on the network quality.
*/ */
void tscInitAsyncDispatcher(int32_t batchLen, int64_t timeout) { SAsyncBulkWriteDispatcher * createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs) {
atomic_init(&asyncBatchEnable, true); SAsyncBulkWriteDispatcher * dispatcher = calloc(1, sizeof(SAsyncBulkWriteDispatcher));
if (!dispatcher) {
asyncBatchLen = batchLen; return NULL;
asyncBatchTimeout = timeout;
// init the queue
insertionQueue = taosOpenQueue();
if (insertionQueue == NULL) {
atomic_store(&asyncBatchEnable, false);
return;
} }
// init the state. dispatcher->batchSize = batchSize;
atomic_init(&exclusiveState, 0); dispatcher->timeoutMs = timeoutMs;
atomic_init(&currentBatchLen, 0);
atomic_store_32(&dispatcher->bufferSize, 0);
atomic_store_32(&dispatcher->currentSize, 0);
atomic_store_8(&dispatcher->shutdown, false);
atomic_store_8(&dispatcher->exclusive, false);
// init the buffer.
dispatcher->buffer = tdListNew(sizeof(SSqlObj*));
if (!dispatcher->buffer) {
tfree(dispatcher);
return NULL;
}
// init the mutex.
pthread_mutex_init(&dispatcher->mutex, NULL);
// init background thread. // init background thread.
if (pthread_create(&background, NULL, tscAsyncBackGroundThread, NULL)) { dispatcher->background = taosCreateThread(dispatcherTimeoutCallback, dispatcher);
atomic_store(&asyncBatchEnable, false); if (!dispatcher->background) {
taosCloseQueue(insertionQueue); tdListFree(dispatcher->buffer);
return; tfree(dispatcher);
return NULL;
} }
return dispatcher;
} }
/** /**
* Destroy the async auto batch dispatcher. * Destroy the async auto batch dispatcher.
*/ */
void tscDestroyAsyncDispatcher() { void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher * dispatcher) {
atomic_init(&asyncBatchEnable, false); if (dispatcher == NULL) {
return;
}
atomic_store_8(&dispatcher->shutdown, true);
// poll and send all the statements in the queue. // poll and send all the statements in the buffer.
while (taosGetQueueItemsNumber(insertionQueue) != 0) { while (atomic_load_32(&dispatcher->bufferSize)) {
tscPollThenSendAsyncQueue(); SArray* statements = dispatcherPollAll(dispatcher);
tscMergeExecute(statements);
} }
// clear the state.
atomic_store(&exclusiveState, 0);
// make sure the thread exit. // make sure the thread exit.
pthread_join(background, NULL); taosDestroyThread(dispatcher->background);
// destroy the buffer.
tdListFree(dispatcher->buffer);
// destroy the mutex.
pthread_mutex_destroy(&dispatcher->mutex);
// destroy the queue. free(dispatcher);
taosCloseQueue(insertionQueue); }
/**
* Init the taosc async bulk write dispatcher.
*
* @param batchSize the batchSize of async bulk write dispatcher.
* @param timeoutMs the timeout of batching in milliseconds.
*/
void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs) {
tscDispatcher = createAsyncBulkWriteDispatcher(batchSize, timeoutMs);
}
/**
* Destroy the taosc async bulk write dispatcher.
*/
void tscDestroyAsyncDispatcher() {
destroyAsyncDispatcher(tscDispatcher);
tscDispatcher = NULL;
} }
/** /**
...@@ -339,7 +433,7 @@ void tscDestroyAsyncDispatcher() { ...@@ -339,7 +433,7 @@ void tscDestroyAsyncDispatcher() {
* @param pSql the sql object to check. * @param pSql the sql object to check.
* @return returns true if the sql object supports auto batch. * @return returns true if the sql object supports auto batch.
*/ */
bool tscSupportAutoBatch(SSqlObj* pSql) { bool tscSupportBulkInsertion(SSqlObj* pSql) {
if (pSql == NULL || !pSql->enableBatch) { if (pSql == NULL || !pSql->enableBatch) {
return false; return false;
} }
...@@ -368,51 +462,38 @@ bool tscSupportAutoBatch(SSqlObj* pSql) { ...@@ -368,51 +462,38 @@ bool tscSupportAutoBatch(SSqlObj* pSql) {
} }
/** /**
* Try to offer the insert statement to the queue. If the number of row reach `asyncBatchSize`, the function * Try to offer the SSqlObj* to the buffer. If the number of row reach `asyncBatchSize`, the function
* will merge the statements in the queue and send them to the vnodes. * will merge the SSqlObj* in the buffer and send them to the vnodes.
* *
* @param pSql the insert statement to offer. * @param pSql the insert statement to offer.
* @return if offer success, returns true. * @return if offer success, returns true.
*/ */
bool tscTryOfferInsertStatements(SSqlObj* pSql) { bool dispatcherTryBatching(SAsyncBulkWriteDispatcher * dispatcher, SSqlObj* pSql) {
if (!atomic_load(&asyncBatchEnable)) { if (atomic_load_8(&dispatcher->shutdown)) {
return false;
}
// the sql object doesn't support auto batch.
if (!tscSupportAutoBatch(pSql)) {
return false; return false;
} }
// the queue is full or reach batch size. // the sql object doesn't support bulk insertion.
if (atomic_load(&currentBatchLen) >= asyncBatchLen) { if (!tscSupportBulkInsertion(pSql)) {
return false; return false;
} }
// the queue is exclusive for writing. // the buffer is exclusive.
if (atomic_load(&exclusiveState) & 0x1) { if (atomic_load_8(&dispatcher->exclusive)) {
return false; return false;
} }
// allocate the queue node. // try to offer pSql to the buffer.
void* node = taosAllocateQitem(sizeof(SSqlObj *)); int32_t currentSize = dispatcherTryOffer(dispatcher, pSql);
if (node == NULL) { if (currentSize < 0) {
return false; return false;
} }
// offer the node to the queue. // the buffer is full or reach batch size.
memcpy(node, &pSql, sizeof(SSqlObj *)); if (currentSize >= dispatcher->batchSize) {
taosWriteQitem(insertionQueue, 0, node); SArray* statements = dispatcherPollAll(dispatcher);
tscMergeExecute(statements);
tscDebug("sql obj %p has been write to insert queue", pSql);
// reach the batch size.
int numsOfRows = pSql->cmd.insertParam.numOfRows;
int batchLen = atomic_fetch_add(&currentBatchLen, numsOfRows) + numsOfRows;
if (batchLen >= asyncBatchLen) {
tscPollThenSendAsyncQueue();
} }
return true; return true;
} }
...@@ -459,9 +540,9 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para ...@@ -459,9 +540,9 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return; return;
} }
if (tscTryOfferInsertStatements(pSql)) { if (tscDispatcher != NULL && dispatcherTryBatching(tscDispatcher, pSql)) {
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
tscDebug("sql obj %p has been buffer in insert queue", pSql); tscDebug("sql obj %p has been buffer in insert buffer", pSql);
return; return;
} }
......
...@@ -217,7 +217,7 @@ void taos_init_imp(void) { ...@@ -217,7 +217,7 @@ void taos_init_imp(void) {
tscDebug("Local End Point is:%s", tsLocalEp); tscDebug("Local End Point is:%s", tsLocalEp);
if (tsAsyncBatchEnable) { if (tsAsyncBatchEnable) {
tscInitAsyncDispatcher(tsAsyncBatchLen, tsAsyncBatchTimeout); tscInitAsyncDispatcher(tsAsyncBatchSize, tsAsyncBatchTimeout);
} }
} }
......
...@@ -2210,7 +2210,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { ...@@ -2210,7 +2210,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i)); SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i));
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SInsertStatementParam* pInsertParam = &pCmd->insertParam; SInsertStatementParam* pInsertParam = &pCmd->insertParam;
assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV); assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV);
// merge all the data blocks by vgroup id. // merge all the data blocks by vgroup id.
......
...@@ -230,9 +230,8 @@ char* strtolower(char *dst, const char *src) { ...@@ -230,9 +230,8 @@ char* strtolower(char *dst, const char *src) {
} }
char* const ret = dst; char* const ret = dst;
while (*src) { while (*src) {
char ch = *(src++); const char ch = *(src++);
ch += (ch >= 'A' && ch <= 'Z') ? 'a' - 'A' : 0; *(dst++) = (ch >= 'A' && ch <= 'Z') ? ch - 'A' + 'a' : ch;
*(dst++) = ch;
if (ch == '\'' || ch == '"') { if (ch == '\'' || ch == '"') {
char prev = ch; char prev = ch;
...@@ -258,10 +257,9 @@ char* strntolower(char *dst, const char *src, int32_t n) { ...@@ -258,10 +257,9 @@ char* strntolower(char *dst, const char *src, int32_t n) {
assert(dst != NULL); assert(dst != NULL);
char* const end = dst + n; char* const end = dst + n;
while (dst != end) { while (dst != end) {
char ch = *(src++); const char ch = *(src++);
ch += (ch >= 'A' && ch <= 'Z') ? 'a' - 'A' : 0; *(dst++) = (ch >= 'A' && ch <= 'Z') ? ch - 'A' + 'a' : ch;
*(dst++) = ch;
if (ch == '\'' || ch == '"') { if (ch == '\'' || ch == '"') {
char prev = ch; char prev = ch;
while (dst != end) { while (dst != end) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册