提交 67db8da1 编写于 作者: Z zhihaop

feat: move SAsyncBulkWriteDispatcher from tscAsync.c to tscBulkWrite.c

上级 a3385091
......@@ -89,7 +89,7 @@ ELSEIF (TD_DARWIN)
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
# generate dynamic library (*.dylib)
ADD_LIBRARY(taos SHARED ${SRC})
ADD_LIBRARY(taos SHARED ${SRC} inc/tscBulkWrite.h)
TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m ${LINK_LUA} cJson)
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCBULKWRITE_H
#define TDENGINE_TSCBULKWRITE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tlist.h"
#include "tarray.h"
#include "tsclient.h"
#include "tthread.h"
typedef struct SAsyncBulkWriteDispatcher {
// the mpmc queue to store the insertion statements. equivalent to SList<SSqlObj*>.
SList* buffer;
// the mutex to protect the buffer.
pthread_mutex_t mutex;
// the background thread to manage batching timeout.
pthread_t* background;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
// the batching timeout in milliseconds.
int32_t timeoutMs;
// the number of item in the buffer.
volatile int32_t bufferSize;
// the number of insertion rows in the buffer.
volatile int32_t currentSize;
// while executing timeout task, the buffer will set exclusive for writing.
volatile bool exclusive;
// whether the dispatcher is shutdown.
volatile bool shutdown;
} SAsyncBulkWriteDispatcher;
/**
* Return the error result to the callback function, and release the sql object.
*
* @param pSql the sql object.
* @param code the error code of the error result.
*/
void tscReturnsError(SSqlObj* pSql, int code);
/**
* Proxy function to perform sequentially insert operation.
*
* @param param the context of `batchResultCallback`.
* @param tres the result object.
* @param code the error code.
*/
void batchResultCallback(void* param, TAOS_RES* tres, int32_t code);
/**
* Merge the statements into single SSqlObj.
*
* @param fp the callback of SSqlObj.
* @param param the parameters of the callback.
* @param statements the sql statements represents in SArray<SSqlObj*>.
* @return the merged SSqlObj.
*/
int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result);
/**
* Get the number of insertion row in the sql statement.
*
* @param pSql the sql statement.
* @return int32_t the number of insertion row.
*/
inline static int32_t statementGetInsertionRows(SSqlObj* pSql) { return pSql->cmd.insertParam.numOfRows; }
/**
* Poll all the SSqlObj* in the dispatcher's buffer.
*
* @param dispatcher the dispatcher.
* @return the items in the dispatcher, SArray<SSqlObj*>.
*/
SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher);
/**
* @brief Try to offer the SSqlObj* to the dispatcher.
*
* @param dispatcher the async bulk write dispatcher.
* @param pSql the sql object to offer.
* @return int32_t if offer success, return the current size of the buffer. otherwise returns -1.
*/
int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* @brief Merge the sql statements and execute the merged sql statement.
*
* @param statements the array of sql statement. a.k.a SArray<SSqlObj*>.
*/
void dispatcherExecute(SArray* statements);
/**
* The thread to manage batching timeout.
*/
void* dispatcherTimeoutCallback(void* arg);
/**
* Create the async bulk write dispatcher.
*
* @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the buffer instead of executing it. If the number of the buffered
* statements reach batchLen, all the statements in the buffer will be merged and sent to vnodes.
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
*/
SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs);
/**
* Destroy the async auto batch dispatcher.
*/
void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher);
/**
* Check if the current sql object supports bulk insertion.
* 1. auto batch feature on the sql object must be enabled.
* 2. must be an `insert into ... value ...` statement.
* 3. the payload type must be kv payload.
*
* @param pSql the sql object to check.
* @return returns true if the sql object supports auto batch.
*/
bool tscSupportBulkInsertion(SSqlObj* pSql);
/**
* Try to offer the SSqlObj* to the buffer. If the number of row reach `asyncBatchSize`, the function
* will merge the SSqlObj* in the buffer and send them to the vnodes.
*
* @param pSql the insert statement to offer.
* @return if offer success, returns true.
*/
bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCBULKWRITE_H
......@@ -504,13 +504,10 @@ int32_t taos_unused_session(TAOS* taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
/**
* The initialization of async insertion auto batch feature.
* Init the taosc async bulk write dispatcher.
*
* @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the execution queue instead of executing it. If the number of the buffered
* statements reach batchSize, all the statements in the queue will be merged and sent to vnodes.
* @param timeoutMs The statements will be sent to vnodes no more than timeoutMs milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
* @param batchSize the batchSize of async bulk write dispatcher.
* @param timeoutMs the timeout of batching in milliseconds.
*/
void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs);
......
......@@ -13,12 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <stdio.h>
#include "os.h"
#include "osAtomic.h"
#include "tarray.h"
#include "tlist.h"
#include "tutil.h"
#include "qTableMeta.h"
......@@ -27,9 +25,8 @@
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsched.h"
#include "tsclient.h"
#include "tthread.h"
#include "tscBulkWrite.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -40,371 +37,7 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf
*/
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
typedef struct SAsyncBulkWriteDispatcher {
// the mpmc queue to store the insertion statements. equivalent to SList<SSqlObj*>.
SList* buffer;
// the mutex to protect the buffer.
pthread_mutex_t mutex;
// the background thread to manage batching timeout.
pthread_t* background;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
// the batching timeout in milliseconds.
int32_t timeoutMs;
// the number of item in the buffer.
volatile int32_t bufferSize;
// the number of insertion rows in the buffer.
volatile int32_t currentSize;
// while executing timeout task, the buffer will set exclusive for writing.
volatile bool exclusive;
// whether the dispatcher is shutdown.
volatile bool shutdown;
} SAsyncBulkWriteDispatcher;
static SAsyncBulkWriteDispatcher *tscDispatcher;
/**
* Return the error result to the callback function, and release the sql object.
*
* @param pSql the sql object.
* @param code the error code of the error result.
*/
static void tscReturnsError(SSqlObj* pSql, int code) {
if (pSql == NULL) {
return;
}
pSql->res.code = code;
tscAsyncResultOnError(pSql);
}
/**
* Represents the callback function and its context.
*/
typedef struct {
__async_cb_func_t fp;
void *param;
} Runnable;
/**
* The context of `tscMergedStatementsCallBack`.
*/
typedef struct {
size_t count;
Runnable runnable[];
} BatchCallBackContext;
/**
* Proxy function to perform sequentially insert operation.
*
* @param param the context of `tscMergedStatementsCallBack`.
* @param tres the result object.
* @param code the error code.
*/
static void tscMergedStatementsCallBack(void *param, TAOS_RES *tres, int32_t code) {
BatchCallBackContext* context = param;
SSqlObj* res = tres;
// handle corner case [context == null].
if (context == NULL) {
tscError("context in `tscMergedStatementsCallBack` is null, which should not happen");
if (tres) {
taosReleaseRef(tscObjRef, res->self);
}
return;
}
// handle corner case [res == null].
if (res == NULL) {
tscError("tres in `tscMergedStatementsCallBack` is null, which should not happen");
free(context);
return;
}
// handle results.
tscDebug("async batch result callback, number of item: %zu", context->count);
for (int i = 0; i < context->count ; ++i) {
// the result object is shared by many sql objects.
// therefore, we need to increase the ref count.
taosAcquireRef(tscObjRef, res->self);
Runnable* runnable = &context->runnable[i];
runnable->fp(runnable->param, res, res == NULL ? code : taos_errno(res));
}
taosReleaseRef(tscObjRef, res->self);
free(param);
}
/**
* Merge the statements into single SSqlObj.
*
* @param fp the callback of SSqlObj.
* @param param the parameters of the callback.
* @param statements the sql statements represents in SArray<SSqlObj*>.
* @return the merged SSqlObj.
*/
static int32_t tscMergeStatements(SArray* statements, SSqlObj** result) {
if (statements == NULL) {
return TSDB_CODE_SUCCESS;
}
size_t count = taosArrayGetSize(statements);
if (count == 0) {
return TSDB_CODE_SUCCESS;
}
// create the callback context.
BatchCallBackContext* context = calloc(1, sizeof(BatchCallBackContext) + count * sizeof(Runnable));
if (context == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("create batch call back context: %p", context);
// initialize the callback context.
context->count = count;
for (size_t i = 0; i < count; ++i) {
SSqlObj* statement = *((SSqlObj ** )taosArrayGet(statements, i));
Runnable * callback = &context->runnable[i];
callback->fp = statement->fp;
callback->param = statement->param;
}
// merge the statements into single one.
tscDebug("start to merge %zu sql objs", count);
int32_t code = tscMergeKVPayLoadSqlObj(statements, result);
if (code != TSDB_CODE_SUCCESS) {
const char* msg = tstrerror(code);
tscDebug("failed to merge sql objects: %s", msg);
free(context);
} else {
// set the merged sql object callback.
(*result)->fp = tscMergedStatementsCallBack;
(*result)->fetchFp = (*result)->fp;
(*result)->param = context;
}
return code;
}
/**
* @brief Get the number of insertion row in the sql statement.
*
* @param pSql the sql statement.
* @return int32_t the number of insertion row.
*/
inline static int32_t tscGetInsertionRows(SSqlObj* pSql) {
return pSql->cmd.insertParam.numOfRows;
}
inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher * dispatcher) {
if (!atomic_load_32(&dispatcher->bufferSize)) {
return NULL;
}
pthread_mutex_lock(&dispatcher->mutex);
SArray* statements = taosArrayInit(atomic_load_32(&dispatcher->bufferSize), sizeof(SSqlObj *));
if (statements == NULL) {
pthread_mutex_unlock(&dispatcher->mutex);
tscError("failed to poll all items: out of memory");
return NULL;
}
// get all the sql statements from the buffer.
while(atomic_load_32(&dispatcher->bufferSize)) {
SListNode* node = tdListPopHead(dispatcher->buffer);
if (!node) {
break;
}
// get the SSqlObj* from the node.
SSqlObj* item;
memcpy(&item, node->data, sizeof(SSqlObj*));
listNodeFree(node);
atomic_fetch_sub_32(&dispatcher->bufferSize, 1);
atomic_fetch_sub_32(&dispatcher->currentSize, tscGetInsertionRows(item));
taosArrayPush(statements, &item);
}
pthread_mutex_unlock(&dispatcher->mutex);
return statements;
}
/**
* @brief Try to offer the SSqlObj* to the dispatcher.
*
* @param dispatcher the async bulk write dispatcher.
* @param pSql the sql object to offer.
* @return int32_t if offer success, return the current size of the buffer. otherwise returns -1.
*/
inline static int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher * dispatcher, SSqlObj* pSql) {
// the buffer is full.
if (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) {
return -1;
}
// offer the node to the buffer.
pthread_mutex_lock(&dispatcher->mutex);
if (tdListAppend(dispatcher->buffer, &pSql)) {
pthread_mutex_unlock(&dispatcher->mutex);
return -1;
}
tscDebug("sql obj %p has been write to insert buffer", pSql);
atomic_fetch_add_32(&dispatcher->bufferSize, 1);
int32_t numOfRows = tscGetInsertionRows(pSql);
int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows);
pthread_mutex_unlock(&dispatcher->mutex);
return currentSize;
}
/**
* @brief Merge the sql statements and execute the merged sql statement.
*
* @param statements the array of sql statement. a.k.a SArray<SSqlObj*>.
*/
static void tscMergeExecute(SArray* statements) {
int32_t code = TSDB_CODE_SUCCESS;
// no item in the buffer (items has been taken by other threads).
if (!statements || !taosArrayGetSize(statements)) {
return;
}
// merge the statements into single one.
SSqlObj* merged = NULL;
code = tscMergeStatements(statements, &merged);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
tscHandleMultivnodeInsert(merged);
taosArrayDestroy(&statements);
return;
_error:
tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
// handling the failures.
for (int i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj* item = *((SSqlObj **)taosArrayGet(statements, i));
tscReturnsError(item, code);
}
taosArrayDestroy(&statements);
}
/**
* The thread to manage batching timeout.
*/
static void* dispatcherTimeoutCallback(void* arg) {
SAsyncBulkWriteDispatcher *dispatcher = arg;
setThreadName("tscBackground");
while (!atomic_load_8(&dispatcher->shutdown)) {
int64_t t0 = taosGetTimestampNs();
atomic_store_8(&dispatcher->exclusive, true);
SArray* statements = dispatcherPollAll(dispatcher);
atomic_store_8(&dispatcher->exclusive, false);
tscMergeExecute(statements);
int64_t t1 = taosGetTimestampNs();
int64_t durationMs = (t1 - t0) / 1000000;
// Similar to scheduleAtFixedRate in Java, if the execution time exceed
// `timeoutMs` milliseconds, then there will be no sleep.
if (durationMs < dispatcher->timeoutMs) {
taosMsleep((int32_t) (dispatcher->timeoutMs - durationMs));
}
}
return NULL;
}
/**
* Create the async bulk write dispatcher.
*
* @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the buffer instead of executing it. If the number of the buffered
* statements reach batchLen, all the statements in the buffer will be merged and sent to vnodes.
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
*/
SAsyncBulkWriteDispatcher * createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs) {
SAsyncBulkWriteDispatcher * dispatcher = calloc(1, sizeof(SAsyncBulkWriteDispatcher));
if (!dispatcher) {
return NULL;
}
dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs;
atomic_store_32(&dispatcher->bufferSize, 0);
atomic_store_32(&dispatcher->currentSize, 0);
atomic_store_8(&dispatcher->shutdown, false);
atomic_store_8(&dispatcher->exclusive, false);
// init the buffer.
dispatcher->buffer = tdListNew(sizeof(SSqlObj*));
if (!dispatcher->buffer) {
tfree(dispatcher);
return NULL;
}
// init the mutex.
pthread_mutex_init(&dispatcher->mutex, NULL);
// init background thread.
dispatcher->background = taosCreateThread(dispatcherTimeoutCallback, dispatcher);
if (!dispatcher->background) {
tdListFree(dispatcher->buffer);
tfree(dispatcher);
return NULL;
}
return dispatcher;
}
/**
* Destroy the async auto batch dispatcher.
*/
void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher * dispatcher) {
if (dispatcher == NULL) {
return;
}
atomic_store_8(&dispatcher->shutdown, true);
// poll and send all the statements in the buffer.
while (atomic_load_32(&dispatcher->bufferSize)) {
SArray* statements = dispatcherPollAll(dispatcher);
tscMergeExecute(statements);
}
// make sure the thread exit.
taosDestroyThread(dispatcher->background);
// destroy the buffer.
tdListFree(dispatcher->buffer);
// destroy the mutex.
pthread_mutex_destroy(&dispatcher->mutex);
free(dispatcher);
}
static SAsyncBulkWriteDispatcher* tscDispatcher;
/**
* Init the taosc async bulk write dispatcher.
......@@ -424,79 +57,6 @@ void tscDestroyAsyncDispatcher() {
tscDispatcher = NULL;
}
/**
* Check if the current sql object supports auto batch.
* 1. auto batch feature on the sql object must be enabled.
* 2. must be an `insert into ... value ...` statement.
* 3. the payload type must be kv payload.
*
* @param pSql the sql object to check.
* @return returns true if the sql object supports auto batch.
*/
bool tscSupportBulkInsertion(SSqlObj* pSql) {
if (pSql == NULL || !pSql->enableBatch) {
return false;
}
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
// only support insert statement.
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
return false;
}
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
// file insert not support.
if (TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
return false;
}
// only support kv payload.
if (pInsertParam->payloadType != PAYLOAD_TYPE_KV) {
return false;
}
return true;
}
/**
* Try to offer the SSqlObj* to the buffer. If the number of row reach `asyncBatchSize`, the function
* will merge the SSqlObj* in the buffer and send them to the vnodes.
*
* @param pSql the insert statement to offer.
* @return if offer success, returns true.
*/
bool dispatcherTryBatching(SAsyncBulkWriteDispatcher * dispatcher, SSqlObj* pSql) {
if (atomic_load_8(&dispatcher->shutdown)) {
return false;
}
// the sql object doesn't support bulk insertion.
if (!tscSupportBulkInsertion(pSql)) {
return false;
}
// the buffer is exclusive.
if (atomic_load_8(&dispatcher->exclusive)) {
return false;
}
// try to offer pSql to the buffer.
int32_t currentSize = dispatcherTryOffer(dispatcher, pSql);
if (currentSize < 0) {
return false;
}
// the buffer is full or reach batch size.
if (currentSize >= dispatcher->batchSize) {
SArray* statements = dispatcherPollAll(dispatcher);
tscMergeExecute(statements);
}
return true;
}
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
SSqlCmd* pCmd = &pSql->cmd;
......@@ -535,7 +95,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
}
if (code != TSDB_CODE_SUCCESS) {
tscReturnsError(pSql, code);
pSql->res.code = code;
tscAsyncResultOnError(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "osAtomic.h"
#include "tscBulkWrite.h"
#include "tscSubquery.h"
#include "tscLog.h"
/**
* Represents the callback function and its context.
*/
typedef struct {
__async_cb_func_t fp;
void* param;
} Runnable;
/**
* The context of `batchResultCallback`.
*/
typedef struct {
size_t count;
Runnable runnable[];
} BatchCallBackContext;
void tscReturnsError(SSqlObj *pSql, int code) {
if (pSql == NULL) {
return;
}
pSql->res.code = code;
tscAsyncResultOnError(pSql);
}
void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
BatchCallBackContext* context = param;
SSqlObj* res = tres;
// handle corner case [context == null].
if (context == NULL) {
tscError("context in `batchResultCallback` is null, which should not happen");
if (tres) {
taosReleaseRef(tscObjRef, res->self);
}
return;
}
// handle corner case [res == null].
if (res == NULL) {
tscError("tres in `batchResultCallback` is null, which should not happen");
free(context);
return;
}
// handle results.
tscDebug("async batch result callback, number of item: %zu", context->count);
for (int i = 0; i < context->count; ++i) {
// the result object is shared by many sql objects.
// therefore, we need to increase the ref count.
taosAcquireRef(tscObjRef, res->self);
Runnable* runnable = &context->runnable[i];
runnable->fp(runnable->param, res, res == NULL ? code : taos_errno(res));
}
taosReleaseRef(tscObjRef, res->self);
free(param);
}
int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
if (statements == NULL) {
return TSDB_CODE_SUCCESS;
}
size_t count = taosArrayGetSize(statements);
if (count == 0) {
return TSDB_CODE_SUCCESS;
}
// create the callback context.
BatchCallBackContext* context = calloc(1, sizeof(BatchCallBackContext) + count * sizeof(Runnable));
if (context == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("create batch call back context: %p", context);
// initialize the callback context.
context->count = count;
for (size_t i = 0; i < count; ++i) {
SSqlObj* statement = *((SSqlObj**)taosArrayGet(statements, i));
Runnable* callback = &context->runnable[i];
callback->fp = statement->fp;
callback->param = statement->param;
}
// merge the statements into single one.
tscDebug("start to merge %zu sql objs", count);
int32_t code = tscMergeKVPayLoadSqlObj(statements, result);
if (code != TSDB_CODE_SUCCESS) {
const char* msg = tstrerror(code);
tscDebug("failed to merge sql objects: %s", msg);
free(context);
} else {
// set the merged sql object callback.
(*result)->fp = batchResultCallback;
(*result)->fetchFp = (*result)->fp;
(*result)->param = context;
}
return code;
}
SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
if (!atomic_load_32(&dispatcher->bufferSize)) {
return NULL;
}
pthread_mutex_lock(&dispatcher->mutex);
SArray* statements = taosArrayInit(atomic_load_32(&dispatcher->bufferSize), sizeof(SSqlObj*));
if (statements == NULL) {
pthread_mutex_unlock(&dispatcher->mutex);
tscError("failed to poll all items: out of memory");
return NULL;
}
// get all the sql statements from the buffer.
while (atomic_load_32(&dispatcher->bufferSize)) {
SListNode* node = tdListPopHead(dispatcher->buffer);
if (!node) {
break;
}
// get the SSqlObj* from the node.
SSqlObj* item;
memcpy(&item, node->data, sizeof(SSqlObj*));
listNodeFree(node);
atomic_fetch_sub_32(&dispatcher->bufferSize, 1);
atomic_fetch_sub_32(&dispatcher->currentSize, statementGetInsertionRows(item));
taosArrayPush(statements, &item);
}
pthread_mutex_unlock(&dispatcher->mutex);
return statements;
}
int32_t dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
// the buffer is full.
if (atomic_load_32(&dispatcher->currentSize) >= dispatcher->batchSize) {
return -1;
}
// offer the node to the buffer.
pthread_mutex_lock(&dispatcher->mutex);
if (tdListAppend(dispatcher->buffer, &pSql)) {
pthread_mutex_unlock(&dispatcher->mutex);
return -1;
}
tscDebug("sql obj %p has been write to insert buffer", pSql);
atomic_fetch_add_32(&dispatcher->bufferSize, 1);
int32_t numOfRows = statementGetInsertionRows(pSql);
int32_t currentSize = atomic_add_fetch_32(&dispatcher->currentSize, numOfRows);
pthread_mutex_unlock(&dispatcher->mutex);
return currentSize;
}
void dispatcherExecute(SArray* statements) {
int32_t code = TSDB_CODE_SUCCESS;
// no item in the buffer (items has been taken by other threads).
if (!statements || !taosArrayGetSize(statements)) {
return;
}
// merge the statements into single one.
SSqlObj* merged = NULL;
code = dispatcherStatementMerge(statements, &merged);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
tscHandleMultivnodeInsert(merged);
taosArrayDestroy(&statements);
return;
_error:
tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
// handling the failures.
for (int i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj* item = *((SSqlObj**)taosArrayGet(statements, i));
tscReturnsError(item, code);
}
taosArrayDestroy(&statements);
}
void* dispatcherTimeoutCallback(void* arg) {
SAsyncBulkWriteDispatcher* dispatcher = arg;
setThreadName("tscBackground");
while (!atomic_load_8(&dispatcher->shutdown)) {
int64_t t0 = taosGetTimestampNs();
atomic_store_8(&dispatcher->exclusive, true);
SArray* statements = dispatcherPollAll(dispatcher);
atomic_store_8(&dispatcher->exclusive, false);
dispatcherExecute(statements);
int64_t t1 = taosGetTimestampNs();
int64_t durationMs = (t1 - t0) / 1000000;
// Similar to scheduleAtFixedRate in Java, if the execution time exceed
// `timeoutMs` milliseconds, then there will be no sleep.
if (durationMs < dispatcher->timeoutMs) {
taosMsleep((int32_t)(dispatcher->timeoutMs - durationMs));
}
}
return NULL;
}
SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs) {
SAsyncBulkWriteDispatcher* dispatcher = calloc(1, sizeof(SAsyncBulkWriteDispatcher));
if (!dispatcher) {
return NULL;
}
dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs;
atomic_store_32(&dispatcher->bufferSize, 0);
atomic_store_32(&dispatcher->currentSize, 0);
atomic_store_8(&dispatcher->shutdown, false);
atomic_store_8(&dispatcher->exclusive, false);
// init the buffer.
dispatcher->buffer = tdListNew(sizeof(SSqlObj*));
if (!dispatcher->buffer) {
tfree(dispatcher);
return NULL;
}
// init the mutex.
pthread_mutex_init(&dispatcher->mutex, NULL);
// init background thread.
dispatcher->background = taosCreateThread(dispatcherTimeoutCallback, dispatcher);
if (!dispatcher->background) {
tdListFree(dispatcher->buffer);
tfree(dispatcher);
return NULL;
}
return dispatcher;
}
void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
if (dispatcher == NULL) {
return;
}
atomic_store_8(&dispatcher->shutdown, true);
// poll and send all the statements in the buffer.
while (atomic_load_32(&dispatcher->bufferSize)) {
SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements);
}
// make sure the thread exit.
taosDestroyThread(dispatcher->background);
// destroy the buffer.
tdListFree(dispatcher->buffer);
// destroy the mutex.
pthread_mutex_destroy(&dispatcher->mutex);
free(dispatcher);
}
bool tscSupportBulkInsertion(SSqlObj* pSql) {
if (pSql == NULL || !pSql->enableBatch) {
return false;
}
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
// only support insert statement.
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
return false;
}
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
// file insert not support.
if (TSDB_QUERY_HAS_TYPE(pInsertParam->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
return false;
}
// only support kv payload.
if (pInsertParam->payloadType != PAYLOAD_TYPE_KV) {
return false;
}
return true;
}
bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
if (atomic_load_8(&dispatcher->shutdown)) {
return false;
}
// the sql object doesn't support bulk insertion.
if (!tscSupportBulkInsertion(pSql)) {
return false;
}
// the buffer is exclusive.
if (atomic_load_8(&dispatcher->exclusive)) {
return false;
}
// try to offer pSql to the buffer.
int32_t currentSize = dispatcherTryOffer(dispatcher, pSql);
if (currentSize < 0) {
return false;
}
// the buffer is full or reach batch size.
if (currentSize >= dispatcher->batchSize) {
SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements);
}
return true;
}
......@@ -91,7 +91,7 @@ extern float tsStreamComputDelayRatio; // the delayed computing ration of the
extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow;
extern bool tsAsyncBatchEnable;
extern int32_t tsAsyncBatchLen;
extern int32_t tsAsyncBatchSize;
extern int64_t tsAsyncBatchTimeout;
// db parameters in client
......
......@@ -128,7 +128,7 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// The statements will be sent to vnodes no more than `tsAsyncBatchTimeout` milliseconds. But the actual time vnodes
// received the statements depends on the network quality.
bool tsAsyncBatchEnable = true;
int32_t tsAsyncBatchLen = 128;
int32_t tsAsyncBatchSize = 128;
int64_t tsAsyncBatchTimeout = 50;
// the maximum allowed query buffer size during query processing for each data node.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册