diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h
index a8fc25588207b2dea9cd88df68a017d5195f30fc..2c6c67495cc188c4ef4134d93073eed0d5ae7c81 100644
--- a/src/client/inc/tsclient.h
+++ b/src/client/inc/tsclient.h
@@ -506,13 +506,13 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
/**
* The initialization of async insertion auto batch feature.
*
- * @param batchLen When user submit an insert statement to `taos_query_ra`, the statement will be buffered
+ * @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the execution queue instead of executing it. If the number of the buffered
- * statements reach batchLen, all the statements in the queue will be merged and sent to vnodes.
- * @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
+ * statements reach batchSize, all the statements in the queue will be merged and sent to vnodes.
+ * @param timeoutMs The statements will be sent to vnodes no more than timeoutMs milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
*/
-void tscInitAsyncDispatcher(int32_t batchLen, int64_t timeout);
+void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs);
/**
* Destroy the async auto batch dispatcher.
diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c
index a1745fb266c16184c759b3ad601489ff99a562e5..c0e8b92fddbcfc21e1b185062a334998c4245463 100644
--- a/src/client/src/tscAsync.c
+++ b/src/client/src/tscAsync.c
@@ -13,19 +13,23 @@
* along with this program. If not, see .
*/
-#include
+#include
+#include
#include "os.h"
+#include "osAtomic.h"
+#include "tarray.h"
+#include "tlist.h"
#include "tutil.h"
#include "qTableMeta.h"
#include "tnote.h"
-#include "tqueue.h"
#include "trpc.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsched.h"
#include "tsclient.h"
+#include "tthread.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
@@ -36,21 +40,36 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf
*/
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
-// The async auto batch feature.
-static atomic_bool asyncBatchEnable;
-// The queue store the async insertion statements
-static taos_queue insertionQueue;
-// The number of statements in the insertion queue.
-static atomic_int currentBatchLen;
-// The maximum auto batch len.
-static int asyncBatchLen;
-// The batch timeout in milliseconds.
-static int64_t asyncBatchTimeout;
-// The state of the insertion queue. While executing timeout task, the queue will set exclusive for writing
-// in order to make sure the statements will be sent to vnodes no more than `timeout` milliseconds.
-static atomic_int exclusiveState;
-// The background thread to manage statement auto batch timeout.
-static pthread_t background;
+typedef struct SAsyncBulkWriteDispatcher {
+ // the mpmc queue to store the insertion statements. equivalent to SList.
+ SList* buffer;
+
+ // the mutex to protect the buffer.
+ pthread_mutex_t mutex;
+
+ // the background thread to manage batching timeout.
+ pthread_t* background;
+
+ // the maximum number of insertion rows in a batch.
+ int32_t batchSize;
+
+ // the batching timeout in milliseconds.
+ int32_t timeoutMs;
+
+ // the number of item in the buffer.
+ volatile int32_t bufferSize;
+
+ // the number of insertion rows in the buffer.
+ volatile int32_t currentSize;
+
+ // while executing timeout task, the buffer will set exclusive for writing.
+ volatile bool exclusive;
+
+ // whether the dispatcher is shutdown.
+ volatile bool shutdown;
+} SAsyncBulkWriteDispatcher;
+
+static SAsyncBulkWriteDispatcher *tscDispatcher;
/**
* Return the error result to the callback function, and release the sql object.
@@ -177,157 +196,232 @@ static int32_t tscMergeStatements(SArray* statements, SSqlObj** result) {
return code;
}
-
/**
- * Fetch all the statements in the insertion queue, clean the insertion queue, and sent the statements to the vnodes.
+ * @brief Get the number of insertion row in the sql statement.
+ *
+ * @param pSql the sql statement.
+ * @return int32_t the number of insertion row.
*/
-static void tscPollThenSendAsyncQueue() {
- // get the number of the items in the queue.
- int sizeOfQueue = taosGetQueueItemsNumber(insertionQueue);
- if (sizeOfQueue == 0) {
- return;
+inline static int32_t tscGetInsertionRows(SSqlObj* pSql) {
+ return pSql->cmd.insertParam.numOfRows;
+}
+
+inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher * dispatcher) {
+ if (!atomic_load_32(&dispatcher->bufferSize)) {
+ return NULL;
}
- int32_t code = TSDB_CODE_SUCCESS;
- SArray* statements = taosArrayInit(0, sizeof(SSqlObj *));
+ pthread_mutex_lock(&dispatcher->mutex);
- // out of memory.
+ SArray* statements = taosArrayInit(atomic_load_32(&dispatcher->bufferSize), sizeof(SSqlObj *));
if (statements == NULL) {
- code = TSDB_CODE_TSC_OUT_OF_MEMORY;
- goto cleanup;
+ pthread_mutex_unlock(&dispatcher->mutex);
+ tscError("failed to poll all items: out of memory");
+ return NULL;
}
- // get the sql statements from the queue.
- for (int i = 0; i < sizeOfQueue; ++i) {
- // get a queue node from the queue.
- int type;
- void* node;
- if (!taosReadQitem(insertionQueue, &type, &node)) {
+ // get all the sql statements from the buffer.
+ while(atomic_load_32(&dispatcher->bufferSize)) {
+ SListNode* node = tdListPopHead(dispatcher->buffer);
+ if (!node) {
break;
}
-
- // get the SSqlObj* from the queue node.
- SSqlObj* item = *((SSqlObj **) node);
- taosFreeQitem(node);
- atomic_fetch_sub(¤tBatchLen, item->cmd.insertParam.numOfRows);
-
- // out of memory.
- if (!taosArrayPush(statements, &item)) {
- code = TSDB_CODE_TSC_OUT_OF_MEMORY;
- tscReturnsError(item, code);
- goto cleanup;
- }
+
+ // get the SSqlObj* from the node.
+ SSqlObj* item;
+ memcpy(&item, node->data, sizeof(SSqlObj*));
+ listNodeFree(node);
+ atomic_fetch_sub_32(&dispatcher->bufferSize, 1);
+ atomic_fetch_sub_32(&dispatcher->currentSize, tscGetInsertionRows(item));
+
+ taosArrayPush(statements, &item);
+ }
+
+ pthread_mutex_unlock(&dispatcher->mutex);
+ return statements;
+}
+
+/**
+ * @brief Try to offer the SSqlObj* to the dispatcher.
+ *
+ * @param dispatcher the async bulk write dispatcher.
+ * @param pSql the sql object to offer.
+ * @return int32_t if offer success, return the current size of the buffer. otherwise returns -1.
+ */
+inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher * dispatcher, SSqlObj* pSql) {
+ // the buffer is full.
+ if (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) {
+ return -1;
}
- // no item in the queue (items has been taken by other threads).
- if (taosArrayGetSize(statements) == 0) {
- goto cleanup;
+
+ // offer the node to the buffer.
+ pthread_mutex_lock(&dispatcher->mutex);
+ if (tdListAppend(dispatcher->buffer, &pSql)) {
+ pthread_mutex_unlock(&dispatcher->mutex);
+ return -1;
+ }
+
+ tscDebug("sql obj %p has been write to insert buffer", pSql);
+
+ atomic_fetch_add_32(&dispatcher->bufferSize, 1);
+ int32_t numOfRows = tscGetInsertionRows(pSql);
+ int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows);
+ pthread_mutex_unlock(&dispatcher->mutex);
+ return currentSize;
+}
+
+/**
+ * @brief Merge the sql statements and execute the merged sql statement.
+ *
+ * @param statements the array of sql statement. a.k.a SArray.
+ */
+static void tscMergeExecute(SArray* statements) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ // no item in the buffer (items has been taken by other threads).
+ if (!statements || !taosArrayGetSize(statements)) {
+ return;
}
// merge the statements into single one.
SSqlObj* merged = NULL;
code = tscMergeStatements(statements, &merged);
- if (code == TSDB_CODE_SUCCESS) {
- tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
- tscHandleMultivnodeInsert(merged);
- taosArrayDestroy(&statements);
- return;
- }
-
-cleanup:
if (code != TSDB_CODE_SUCCESS) {
- tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
+ goto _error;
}
+
+ tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
+ tscHandleMultivnodeInsert(merged);
+ taosArrayDestroy(&statements);
+ return;
+
+_error:
+ tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
+
// handling the failures.
- if (statements) {
- for (int i = 0; i < taosArrayGetSize(statements); ++i) {
- SSqlObj* item = *((SSqlObj **)taosArrayGet(statements, i));
- tscReturnsError(item, code);
- }
- taosArrayDestroy(&statements);
+ for (int i = 0; i < taosArrayGetSize(statements); ++i) {
+ SSqlObj* item = *((SSqlObj **)taosArrayGet(statements, i));
+ tscReturnsError(item, code);
}
+ taosArrayDestroy(&statements);
}
/**
- * The background thread to manage statement batch timeout.
+ * The thread to manage batching timeout.
*/
-static void* tscAsyncBackGroundThread(void* args) {
- const int64_t timeoutUs = asyncBatchTimeout * 1000L;
+static void* dispatcherTimeoutCallback(void* arg) {
+ SAsyncBulkWriteDispatcher *dispatcher = arg;
setThreadName("tscBackground");
- while (atomic_load(&asyncBatchEnable)) {
- // set the exclusive state.
- atomic_fetch_or(&exclusiveState, 0x1);
-
+ while (!atomic_load_8(&dispatcher->shutdown)) {
int64_t t0 = taosGetTimestampNs();
- tscPollThenSendAsyncQueue();
- int64_t t1 = taosGetTimestampNs();
- // unset the exclusive state.
- atomic_fetch_and(&exclusiveState, ~0x1);
+ atomic_store_8(&dispatcher->exclusive, true);
+ SArray* statements = dispatcherPollAll(dispatcher);
+ atomic_store_8(&dispatcher->exclusive, false);
+
+ tscMergeExecute(statements);
- int64_t durationUs = (t1 - t0) / 1000L;
- // Similar to scheduleAtFixedRate in Java, if the execution time of `tscPollThenSendAsyncQueue` exceed
- // `asyncBatchTimeout` milliseconds, then there will be no sleep.
- if (durationUs < timeoutUs) {
- usleep(timeoutUs - durationUs);
+ int64_t t1 = taosGetTimestampNs();
+ int64_t durationMs = (t1 - t0) / 1000000;
+
+ // Similar to scheduleAtFixedRate in Java, if the execution time exceed
+ // `timeoutMs` milliseconds, then there will be no sleep.
+ if (durationMs < dispatcher->timeoutMs) {
+ taosMsleep((int32_t) (dispatcher->timeoutMs - durationMs));
}
}
- return args;
+ return NULL;
}
/**
- * The initialization of async insertion auto batch feature.
+ * Create the async bulk write dispatcher.
*
- * @param batchLen When user submit an insert statement to `taos_query_ra`, the statement will be buffered
- * asynchronously in the execution queue instead of executing it. If the number of the buffered
- * statements reach batchLen, all the statements in the queue will be merged and sent to vnodes.
+ * @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
+ * asynchronously in the buffer instead of executing it. If the number of the buffered
+ * statements reach batchLen, all the statements in the buffer will be merged and sent to vnodes.
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
*/
-void tscInitAsyncDispatcher(int32_t batchLen, int64_t timeout) {
- atomic_init(&asyncBatchEnable, true);
-
- asyncBatchLen = batchLen;
- asyncBatchTimeout = timeout;
-
- // init the queue
- insertionQueue = taosOpenQueue();
- if (insertionQueue == NULL) {
- atomic_store(&asyncBatchEnable, false);
- return;
+SAsyncBulkWriteDispatcher * createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs) {
+ SAsyncBulkWriteDispatcher * dispatcher = calloc(1, sizeof(SAsyncBulkWriteDispatcher));
+ if (!dispatcher) {
+ return NULL;
}
- // init the state.
- atomic_init(&exclusiveState, 0);
- atomic_init(¤tBatchLen, 0);
+ dispatcher->batchSize = batchSize;
+ dispatcher->timeoutMs = timeoutMs;
+
+ atomic_store_32(&dispatcher->bufferSize, 0);
+ atomic_store_32(&dispatcher->currentSize, 0);
+ atomic_store_8(&dispatcher->shutdown, false);
+ atomic_store_8(&dispatcher->exclusive, false);
+
+ // init the buffer.
+ dispatcher->buffer = tdListNew(sizeof(SSqlObj*));
+ if (!dispatcher->buffer) {
+ tfree(dispatcher);
+ return NULL;
+ }
+
+ // init the mutex.
+ pthread_mutex_init(&dispatcher->mutex, NULL);
// init background thread.
- if (pthread_create(&background, NULL, tscAsyncBackGroundThread, NULL)) {
- atomic_store(&asyncBatchEnable, false);
- taosCloseQueue(insertionQueue);
- return;
+ dispatcher->background = taosCreateThread(dispatcherTimeoutCallback, dispatcher);
+ if (!dispatcher->background) {
+ tdListFree(dispatcher->buffer);
+ tfree(dispatcher);
+ return NULL;
}
+
+ return dispatcher;
}
/**
* Destroy the async auto batch dispatcher.
*/
-void tscDestroyAsyncDispatcher() {
- atomic_init(&asyncBatchEnable, false);
+void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher * dispatcher) {
+ if (dispatcher == NULL) {
+ return;
+ }
+
+ atomic_store_8(&dispatcher->shutdown, true);
- // poll and send all the statements in the queue.
- while (taosGetQueueItemsNumber(insertionQueue) != 0) {
- tscPollThenSendAsyncQueue();
+ // poll and send all the statements in the buffer.
+ while (atomic_load_32(&dispatcher->bufferSize)) {
+ SArray* statements = dispatcherPollAll(dispatcher);
+ tscMergeExecute(statements);
}
- // clear the state.
- atomic_store(&exclusiveState, 0);
// make sure the thread exit.
- pthread_join(background, NULL);
+ taosDestroyThread(dispatcher->background);
+
+ // destroy the buffer.
+ tdListFree(dispatcher->buffer);
+
+ // destroy the mutex.
+ pthread_mutex_destroy(&dispatcher->mutex);
- // destroy the queue.
- taosCloseQueue(insertionQueue);
+ free(dispatcher);
+}
+
+/**
+ * Init the taosc async bulk write dispatcher.
+ *
+ * @param batchSize the batchSize of async bulk write dispatcher.
+ * @param timeoutMs the timeout of batching in milliseconds.
+ */
+void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs) {
+ tscDispatcher = createAsyncBulkWriteDispatcher(batchSize, timeoutMs);
+}
+
+/**
+ * Destroy the taosc async bulk write dispatcher.
+ */
+void tscDestroyAsyncDispatcher() {
+ destroyAsyncDispatcher(tscDispatcher);
+ tscDispatcher = NULL;
}
/**
@@ -339,7 +433,7 @@ void tscDestroyAsyncDispatcher() {
* @param pSql the sql object to check.
* @return returns true if the sql object supports auto batch.
*/
-bool tscSupportAutoBatch(SSqlObj* pSql) {
+bool tscSupportBulkInsertion(SSqlObj* pSql) {
if (pSql == NULL || !pSql->enableBatch) {
return false;
}
@@ -368,51 +462,38 @@ bool tscSupportAutoBatch(SSqlObj* pSql) {
}
/**
- * Try to offer the insert statement to the queue. If the number of row reach `asyncBatchSize`, the function
- * will merge the statements in the queue and send them to the vnodes.
+ * Try to offer the SSqlObj* to the buffer. If the number of row reach `asyncBatchSize`, the function
+ * will merge the SSqlObj* in the buffer and send them to the vnodes.
*
* @param pSql the insert statement to offer.
* @return if offer success, returns true.
*/
-bool tscTryOfferInsertStatements(SSqlObj* pSql) {
- if (!atomic_load(&asyncBatchEnable)) {
- return false;
- }
-
- // the sql object doesn't support auto batch.
- if (!tscSupportAutoBatch(pSql)) {
+bool dispatcherTryBatching(SAsyncBulkWriteDispatcher * dispatcher, SSqlObj* pSql) {
+ if (atomic_load_8(&dispatcher->shutdown)) {
return false;
}
- // the queue is full or reach batch size.
- if (atomic_load(¤tBatchLen) >= asyncBatchLen) {
+ // the sql object doesn't support bulk insertion.
+ if (!tscSupportBulkInsertion(pSql)) {
return false;
}
- // the queue is exclusive for writing.
- if (atomic_load(&exclusiveState) & 0x1) {
+ // the buffer is exclusive.
+ if (atomic_load_8(&dispatcher->exclusive)) {
return false;
}
- // allocate the queue node.
- void* node = taosAllocateQitem(sizeof(SSqlObj *));
- if (node == NULL) {
+ // try to offer pSql to the buffer.
+ int32_t currentSize = dispatcherTryOffer(dispatcher, pSql);
+ if (currentSize < 0) {
return false;
}
-
- // offer the node to the queue.
- memcpy(node, &pSql, sizeof(SSqlObj *));
- taosWriteQitem(insertionQueue, 0, node);
-
- tscDebug("sql obj %p has been write to insert queue", pSql);
-
- // reach the batch size.
- int numsOfRows = pSql->cmd.insertParam.numOfRows;
- int batchLen = atomic_fetch_add(¤tBatchLen, numsOfRows) + numsOfRows;
- if (batchLen >= asyncBatchLen) {
- tscPollThenSendAsyncQueue();
+
+ // the buffer is full or reach batch size.
+ if (currentSize >= dispatcher->batchSize) {
+ SArray* statements = dispatcherPollAll(dispatcher);
+ tscMergeExecute(statements);
}
-
return true;
}
@@ -459,9 +540,9 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
- if (tscTryOfferInsertStatements(pSql)) {
+ if (tscDispatcher != NULL && dispatcherTryBatching(tscDispatcher, pSql)) {
taosReleaseRef(tscObjRef, pSql->self);
- tscDebug("sql obj %p has been buffer in insert queue", pSql);
+ tscDebug("sql obj %p has been buffer in insert buffer", pSql);
return;
}
diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c
index 5d6cd445a81e0c92aa7bd6bd95d81864dc880c69..620525263f9a53cb5269812ddf2dbd71f5bd028d 100644
--- a/src/client/src/tscSystem.c
+++ b/src/client/src/tscSystem.c
@@ -217,7 +217,7 @@ void taos_init_imp(void) {
tscDebug("Local End Point is:%s", tsLocalEp);
if (tsAsyncBatchEnable) {
- tscInitAsyncDispatcher(tsAsyncBatchLen, tsAsyncBatchTimeout);
+ tscInitAsyncDispatcher(tsAsyncBatchSize, tsAsyncBatchTimeout);
}
}
diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c
index fdea3839269057d8c48f394f78c3c855cd8065d7..a841945baced4d285acf1a460e8efc64f59cc6f1 100644
--- a/src/client/src/tscUtil.c
+++ b/src/client/src/tscUtil.c
@@ -2210,7 +2210,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i));
SSqlCmd *pCmd = &pSql->cmd;
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
-
+
assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV);
// merge all the data blocks by vgroup id.
diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c
index 6fca8f6a0f3833669159723bb2435f2b16e77e87..649f8fa5e9697eac76f4a97eb44d5b18e06924ec 100644
--- a/src/util/src/tutil.c
+++ b/src/util/src/tutil.c
@@ -230,9 +230,8 @@ char* strtolower(char *dst, const char *src) {
}
char* const ret = dst;
while (*src) {
- char ch = *(src++);
- ch += (ch >= 'A' && ch <= 'Z') ? 'a' - 'A' : 0;
- *(dst++) = ch;
+ const char ch = *(src++);
+ *(dst++) = (ch >= 'A' && ch <= 'Z') ? ch - 'A' + 'a' : ch;
if (ch == '\'' || ch == '"') {
char prev = ch;
@@ -258,10 +257,9 @@ char* strntolower(char *dst, const char *src, int32_t n) {
assert(dst != NULL);
char* const end = dst + n;
while (dst != end) {
- char ch = *(src++);
- ch += (ch >= 'A' && ch <= 'Z') ? 'a' - 'A' : 0;
- *(dst++) = ch;
-
+ const char ch = *(src++);
+ *(dst++) = (ch >= 'A' && ch <= 'Z') ? ch - 'A' + 'a' : ch;
+
if (ch == '\'' || ch == '"') {
char prev = ch;
while (dst != end) {