提交 58392c19 编写于 作者: Z zhihaop

feat: refactor tscBatchWrite and tscBatchMerge for better code readability

上级 138bf9b2
......@@ -317,7 +317,7 @@ keepColumnName 1
# enable thread local batch dispatcher
# asyncBatchThreadLocal 1
# taosc async insertion batch size, maximum 65535
# taosc async insertion batch size, maximum 4096
# asyncBatchSize 256
# taosc async batching timeout in milliseconds, maximum 2048
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCDATABLOCKMERGE_H
#define TDENGINE_TSCDATABLOCKMERGE_H
#ifndef TDENGINE_TSCBATCHMERGE_H
#define TDENGINE_TSCBATCHMERGE_H
#include "hash.h"
#include "taosmsg.h"
......@@ -260,14 +260,15 @@ SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOf
* Merge the KV-PayLoad SQL objects into single one.
* The statements here must be an insertion statement and no schema attached.
*
* @param statements the array of statements. a.k.a SArray<SSqlObj*>.
* @param polls the array of SSqlObj*.
* @param nPolls the number of SSqlObj* in the array.
* @param result the returned result. result is not null!
* @return the status code. usually TSDB_CODE_SUCCESS.
* @return the status code.
*/
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result);
int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj *result);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCDATABLOCKMERGE_H
#endif // TDENGINE_TSCBATCHMERGE_H
......@@ -13,41 +13,56 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCBULKWRITE_H
#define TDENGINE_TSCBULKWRITE_H
#ifndef TDENGINE_TSCBATCHWRITE_H
#define TDENGINE_TSCBATCHWRITE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tarray.h"
#include "tthread.h"
// forward declaration.
typedef struct SSqlObj SSqlObj;
typedef struct SDispatcherTimeoutManager SDispatcherTimeoutManager;
/**
* SAsyncBulkWriteDispatcher is an async batching dispatcher(for writing), it can buffer insertion statements, batch
* and merge them into single statement.
* SAsyncBatchWriteDispatcher is an async batching write dispatcher (ABWD). ABWD accepts the recent SQL requests and put
* them in a queue waiting to be scheduled. When the number of requests in the queue reaches batch_size, it merges the
* requests in the queue and sends them to the server, thus reducing the network overhead caused by multiple
* communications to the server and directly improving the throughput of small object asynchronous writes.
*/
typedef struct SAsyncBulkWriteDispatcher {
// the buffer to store the insertion statements. equivalent to SArray<SSqlObj*>.
SArray* buffer;
typedef struct SAsyncBatchWriteDispatcher {
// the timeout manager.
SDispatcherTimeoutManager* timeoutManager;
// the mutex to protect the dispatcher.
pthread_mutex_t bufferMutex;
// the mutex to sleep the background thread.
pthread_mutex_t sleepMutex;
// the cond to signal to background thread.
pthread_cond_t timeout;
// the cond to signal when buffer not full.
pthread_cond_t notFull;
// the background thread to manage batching timeout.
pthread_t background;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
// the number of insertion rows in the buffer.
int32_t currentSize;
// the number of items in the buffer.
int32_t bufferSize;
// whether the dispatcher is shutdown.
volatile bool shutdown;
SSqlObj* buffer[];
} SAsyncBatchWriteDispatcher;
/**
* The manager of SAsyncBatchWriteDispatcher. Call dispatcherAcquire(...) to get the SAsyncBatchWriteDispatcher
* instance. SDispatcherManager will manage the life cycle of SAsyncBatchWriteDispatcher.
*/
typedef struct SDispatcherManager {
pthread_key_t key;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
......@@ -55,36 +70,82 @@ typedef struct SAsyncBulkWriteDispatcher {
// the batching timeout in milliseconds.
int32_t timeoutMs;
// the number of insertion rows in the buffer.
int32_t currentSize;
// specifies whether the dispatcher is thread local, if the dispatcher is not
// thread local, we will use the global dispatcher below.
bool isThreadLocal;
// whether the dispatcher is shutdown.
// the global dispatcher, if thread local enabled, global will be set to NULL.
SAsyncBatchWriteDispatcher* global;
} SDispatcherManager;
/**
* Control the timeout of the dispatcher queue.
*/
typedef struct SDispatcherTimeoutManager {
// the dispatcher that timeout manager belongs to.
SAsyncBatchWriteDispatcher* dispatcher;
// the background thread.
pthread_t background;
// the mutex to sleep the background thread.
pthread_mutex_t sleepMutex;
// the cond to signal to background thread.
pthread_cond_t timeout;
// the batching timeout in milliseconds.
int32_t timeoutMs;
// whether the timeout manager is shutdown.
volatile bool shutdown;
} SDispatcherTimeoutManager;
/**
* Create the dispatcher timeout manager.
*/
SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispatcher* dispatcher, int32_t timeoutMs);
} SAsyncBulkWriteDispatcher;
/**
* Destroy the dispatcher timeout manager.
*/
void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
// forward declaration.
typedef struct SSqlObj SSqlObj;
/**
* Check if the timeout manager is shutdown.
* @param manager the timeout manager.
* @return whether the timeout manager is shutdown.
*/
bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
/**
* Shutdown the SDispatcherTimeoutManager.
* @param manager the SDispatcherTimeoutManager.
*/
void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
/**
* Merge the statements into single SSqlObj.
*
* @param fp the callback of SSqlObj.
* @param param the parameters of the callback.
* @param statements the sql statements represents in SArray<SSqlObj*>.
* @param polls the array of SSqlObj*.
* @param nPolls the number of SSqlObj* in the array.
* @return the merged SSqlObj.
*/
int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result);
int32_t dispatcherBatchMerge(SSqlObj** polls, size_t nPolls, SSqlObj** result);
/**
* Merge the sql statements and execute the merged sql statement.
*
* @param statements the array of sql statement. a.k.a SArray<SSqlObj*>.
* @param polls the array of SSqlObj*.
* @param nPolls the number of SSqlObj* in the array.
*/
void dispatcherExecute(SArray* statements);
void dispatcherExecute(SSqlObj** polls, size_t nPolls);
/**
* Create the async bulk write dispatcher.
* Create the async batch write dispatcher.
*
* @param batchSize When user submit an insert statement to `taos_query_ra`, the statement will be buffered
* asynchronously in the buffer instead of executing it. If the number of the buffered
......@@ -92,25 +153,25 @@ void dispatcherExecute(SArray* statements);
* @param timeout The statements will be sent to vnodes no more than timeout milliseconds. But the actual time
* vnodes received the statements depends on the network quality.
*/
SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs);
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(int32_t batchSize, int32_t timeoutMs);
/**
* Destroy the async auto batch dispatcher.
*/
void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher);
void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher);
/**
* Check if the current sql object supports bulk insertion.
* Check if the current sql object can be dispatch by ABWD.
* 1. auto batch feature on the sql object must be enabled.
* 2. must be an `insert into ... value ...` statement.
* 3. the payload type must be kv payload.
* 4. no schema attached.
*
* @param dispatcher the async dispatcher.
* @param dispatcher the dispatcher.
* @param pSql the sql object to check.
* @return returns true if the sql object supports auto batch.
* @return returns true if the sql object can be dispatch by ABWD.
*/
bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql);
bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* Try to offer the SSqlObj* to the dispatcher. If the number of row reach `batchSize`, the function
......@@ -119,58 +180,36 @@ bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSq
* @param pSql the insert statement to offer.
* @return if offer success, returns true.
*/
bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* A holder of SAsyncBulkWriteDispatcher. Call dispatcherAcquire(...) to get the SAsyncBulkWriteDispatcher
* instance. This holder will manage the life cycle of SAsyncBulkWriteDispatcher.
*/
typedef struct SDispatcherHolder {
pthread_key_t key;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
// the batching timeout in milliseconds.
int32_t timeoutMs;
// specifies whether the dispatcher is thread local, if the dispatcher is not
// thread local, we will use the global dispatcher below.
bool isThreadLocal;
// the global dispatcher, if thread local enabled, global will be set to NULL.
SAsyncBulkWriteDispatcher * global;
} SDispatcherHolder;
bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* Create a holder of SAsyncBulkWriteDispatcher.
* Create the manager of SAsyncBatchWriteDispatcher.
*
* @param batchSize the batchSize of SAsyncBulkWriteDispatcher.
* @param timeoutMs the timeoutMs of SAsyncBulkWriteDispatcher.
* @param batchSize the batchSize of SAsyncBatchWriteDispatcher.
* @param timeoutMs the timeoutMs of SAsyncBatchWriteDispatcher.
* @param isThreadLocal specifies whether the dispatcher is thread local.
* @return the SAsyncBulkWriteDispatcher holder.
* @return the SAsyncBatchWriteDispatcher manager.
*/
SDispatcherHolder* createDispatcherHolder(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal);
SDispatcherManager* createDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal);
/**
* Destroy the holder of SAsyncBulkWriteDispatcher.
* (will destroy all the instances of SAsyncBulkWriteDispatcher in the thread local variable)
* Destroy the SDispatcherManager.
* (will destroy all the instances of SAsyncBatchWriteDispatcher in the thread local variable)
*
* @param holder the holder of SAsyncBulkWriteDispatcher.
* @param manager the SDispatcherManager.
*/
void destroyDispatcherHolder(SDispatcherHolder* holder);
void destroyDispatcherManager(SDispatcherManager* manager);
/**
* Get an instance of SAsyncBulkWriteDispatcher.
* Get an instance of SAsyncBatchWriteDispatcher.
*
* @param holder the holder of SAsyncBulkWriteDispatcher.
* @return the SAsyncBulkWriteDispatcher instance.
* @param manager the SDispatcherManager.
* @return the SAsyncBatchWriteDispatcher instance.
*/
SAsyncBulkWriteDispatcher* dispatcherAcquire(SDispatcherHolder* holder);
SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCBULKWRITE_H
#endif // TDENGINE_TSCBATCHWRITE_H
......@@ -511,18 +511,18 @@ int32_t taos_unused_session(TAOS* taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
/**
* Init the taosc async bulk write dispatcher.
* Init the manager of async batch write dispatcher.
*
* @param batchSize the batchSize of async bulk write dispatcher.
* @param batchSize the batchSize of async batch write dispatcher.
* @param timeoutMs the timeout of batching in milliseconds.
* @param isThreadLocal specifies whether the dispatcher is thread local.
*/
void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal);
void tscInitDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal);
/**
* Destroy the async auto batch dispatcher.
* Destroy the manager of async batch write dispatcher.
*/
void tscDestroyAsyncDispatcher();
void tscDestroyDispatcherManager();
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
......@@ -553,8 +553,8 @@ extern SHashObj *tscTableMetaMap;
extern SCacheObj *tscVgroupListBuf;
// forward declaration.
typedef struct SDispatcherHolder SDispatcherHolder;
extern SDispatcherHolder *tscDispatcher;
typedef struct SDispatcherManager SDispatcherManager;
extern SDispatcherManager *tscDispatcherManager;
extern int tscObjRef;
extern void *tscTmr;
extern void *tscQhandle;
......
......@@ -22,11 +22,11 @@
#include "qTableMeta.h"
#include "tnote.h"
#include "trpc.h"
#include "tscBatchWrite.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscBulkWrite.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -395,8 +395,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
if (tscDispatcher != NULL) {
SAsyncBulkWriteDispatcher* dispatcher = dispatcherAcquire(tscDispatcher);
if (tscDispatcherManager != NULL) {
SAsyncBatchWriteDispatcher * dispatcher = dispatcherAcquire(tscDispatcherManager);
if (dispatcherTryDispatch(dispatcher, pSql)) {
taosReleaseRef(tscObjRef, pSql->self);
tscDebug("sql obj %p has been buffer in insert buffer", pSql);
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscDataBlockMerge.h"
#include "tscBatchMerge.h"
/**
* A util function to compare two SName.
......@@ -456,9 +456,9 @@ bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) {
return taosArrayPush(builder->tableNames, &name);
}
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj* result) {
int32_t tscMergeSSqlObjs(SSqlObj** polls, size_t nPolls, SSqlObj* result) {
// statement array is empty.
if (!statements || !taosArrayGetSize(statements)) {
if (!polls || !nPolls) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -474,8 +474,8 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj* result) {
}
// append the existing data blocks to builder.
for (size_t i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj *pSql = taosArrayGetP(statements, i);
for (size_t i = 0; i < nPolls; ++i) {
SSqlObj *pSql = polls[i];
SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam;
if (!pInsertParam->pDataBlocks) {
continue;
......
......@@ -15,11 +15,11 @@
#include "osAtomic.h"
#include "tscDataBlockMerge.h"
#include "tscBulkWrite.h"
#include "tscLog.h"
#include "tscBatchMerge.h"
#include "tscBatchWrite.h"
#include "tscSubquery.h"
#include "tsclient.h"
#include "tscLog.h"
/**
* Represents the callback function and its context.
......@@ -27,15 +27,15 @@
typedef struct {
__async_cb_func_t fp;
void* param;
} Runnable;
} SCallbackHandler;
/**
* The context of `batchResultCallback`.
*/
typedef struct {
size_t count;
Runnable runnable[];
} BatchCallBackContext;
size_t nHandlers;
SCallbackHandler handler[];
} SBatchCallbackContext;
/**
* Get the number of insertion row in the sql statement.
......@@ -70,7 +70,7 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) {
* @param code the error code.
*/
static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
BatchCallBackContext* context = param;
SBatchCallbackContext* context = param;
SSqlObj* res = tres;
// handle corner case [context == null].
......@@ -90,32 +90,27 @@ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
}
// handle results.
tscDebug("async batch result callback, number of item: %zu", context->count);
for (int i = 0; i < context->count; ++i) {
tscDebug("async batch result callback, number of item: %zu", context->nHandlers);
for (int i = 0; i < context->nHandlers; ++i) {
// the result object is shared by many sql objects.
// therefore, we need to increase the ref count.
taosAcquireRef(tscObjRef, res->self);
Runnable* runnable = &context->runnable[i];
runnable->fp(runnable->param, res, res == NULL ? code : taos_errno(res));
SCallbackHandler* handler = &context->handler[i];
handler->fp(handler->param, res, code);
}
taosReleaseRef(tscObjRef, res->self);
free(context);
}
int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
if (statements == NULL) {
return TSDB_CODE_SUCCESS;
}
size_t count = taosArrayGetSize(statements);
if (count == 0) {
int32_t dispatcherBatchMerge(SSqlObj** polls, size_t nPolls, SSqlObj** result) {
if (!polls || !nPolls) {
return TSDB_CODE_SUCCESS;
}
// create the callback context.
BatchCallBackContext* context = calloc(1, sizeof(BatchCallBackContext) + count * sizeof(Runnable));
SBatchCallbackContext* context = calloc(1, sizeof(SBatchCallbackContext) + nPolls * sizeof(SCallbackHandler));
if (context == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -123,17 +118,17 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
tscDebug("create batch call back context: %p", context);
// initialize the callback context.
context->count = count;
for (size_t i = 0; i < count; ++i) {
SSqlObj* statement = taosArrayGetP(statements, i);
context->runnable[i].fp = statement->fp;
context->runnable[i].param = statement->param;
context->nHandlers = nPolls;
for (size_t i = 0; i < nPolls; ++i) {
SSqlObj* pSql = polls[i];
context->handler[i].fp = pSql->fp;
context->handler[i].param = pSql->param;
}
// merge the statements into single one.
tscDebug("start to merge %zu sql objs", count);
SSqlObj *pFirst = taosArrayGetP(statements, 0);
int32_t code = tscMergeKVPayLoadSqlObj(statements, pFirst);
tscDebug("start to merge %zu sql objs", nPolls);
SSqlObj *pFirst = polls[0];
int32_t code = tscMergeSSqlObjs(polls, nPolls, pFirst);
if (code != TSDB_CODE_SUCCESS) {
const char* msg = tstrerror(code);
tscDebug("failed to merge sql objects: %s", msg);
......@@ -147,8 +142,8 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
pFirst->fetchFp = pFirst->fp;
*result = pFirst;
for (int i = 1; i < count; ++i) {
SSqlObj *pSql = taosArrayGetP(statements, i);
for (int i = 1; i < nPolls; ++i) {
SSqlObj *pSql = polls[i];
taosReleaseRef(tscObjRef, pSql->self);
}
return code;
......@@ -159,36 +154,43 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
* you need to notify dispatcher->notFull by yourself.
*
* @param dispatcher the dispatcher.
* @return the items in the dispatcher, SArray<SSqlObj*>.
* @param nPolls the number of polled SSqlObj*.
* @return all the SSqlObj* in the buffer.
*/
inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
if (!taosArrayGetSize(dispatcher->buffer)) {
inline static SSqlObj** dispatcherPollAll(SAsyncBatchWriteDispatcher* dispatcher, size_t* nPolls) {
if (!dispatcher->bufferSize) {
*nPolls = 0;
return NULL;
}
SArray* statements = taosArrayDup(dispatcher->buffer);
if (statements == NULL) {
SSqlObj** clone = malloc(sizeof(SSqlObj*) * dispatcher->bufferSize);
if (clone == NULL) {
tscError("failed to poll all items: out of memory");
*nPolls = 0;
return NULL;
}
memcpy(clone, dispatcher->buffer, sizeof(SSqlObj*) * dispatcher->bufferSize);
*nPolls = dispatcher->bufferSize;
dispatcher->currentSize = 0;
taosArrayClear(dispatcher->buffer);
return statements;
dispatcher->bufferSize = 0;
return clone;
}
/**
* Poll all the SSqlObj* in the dispatcher's buffer.
*
* @param dispatcher the dispatcher.
* @return the items in the dispatcher, SArray<SSqlObj*>.
* @param nPolls the number of polled SSqlObj*.
* @return all the SSqlObj* in the buffer.
*/
inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
inline static SSqlObj** dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispatcher, size_t* nPolls) {
SSqlObj** polls = NULL;
pthread_mutex_lock(&dispatcher->bufferMutex);
SArray* statements = dispatcherPollAll(dispatcher);
polls = dispatcherPollAll(dispatcher, nPolls);
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->bufferMutex);
return statements;
return polls;
}
/**
......@@ -198,7 +200,7 @@ inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatche
* @param pSql the sql object to offer.
* @return return whether offer success.
*/
inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
inline static bool dispatcherTryOffer(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) {
pthread_mutex_lock(&dispatcher->bufferMutex);
// if dispatcher is shutdown, must fail back to normal insertion.
......@@ -216,44 +218,51 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq
}
}
taosArrayPush(dispatcher->buffer, pSql);
dispatcher->buffer[dispatcher->bufferSize++] = pSql;
dispatcher->currentSize += statementGetInsertionRows(pSql);
tscDebug("sql obj %p has been write to insert buffer", pSql);
// the dispatcher has been shutdown or reach batch size.
if (atomic_load_8(&dispatcher->shutdown) || dispatcher->currentSize >= dispatcher->batchSize) {
SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements);
taosArrayDestroy(&statements);
pthread_cond_broadcast(&dispatcher->notFull);
if (dispatcher->currentSize < dispatcher->batchSize) {
pthread_mutex_unlock(&dispatcher->bufferMutex);
return true;
}
// the dispatcher reaches batch size.
size_t nPolls = 0;
SSqlObj** polls = dispatcherPollAll(dispatcher, &nPolls);
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->bufferMutex);
if (polls) {
dispatcherExecute(polls, nPolls);
free(polls);
}
return true;
}
void dispatcherExecute(SArray* statements) {
void dispatcherExecute(SSqlObj** polls, size_t nPolls) {
int32_t code = TSDB_CODE_SUCCESS;
// no item in the buffer (items has been taken by other threads).
if (!statements || !taosArrayGetSize(statements)) {
if (!polls || !nPolls) {
return;
}
// merge the statements into single one.
SSqlObj* merged = NULL;
code = dispatcherStatementMerge(statements, &merged);
code = dispatcherBatchMerge(polls, nPolls, &merged);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscDebug("merging %zu sql objs into %p", taosArrayGetSize(statements), merged);
tscDebug("merging %zu sql objs into %p", nPolls, merged);
tscHandleMultivnodeInsert(merged);
return;
_error:
tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
// handling the failures.
for (size_t i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj* item = taosArrayGetP(statements, i);
for (size_t i = 0; i < nPolls; ++i) {
SSqlObj* item = polls[i];
tscReturnsError(item, code);
}
}
......@@ -277,77 +286,69 @@ static inline void afterMillis(struct timespec *t, int32_t millis) {
* @param dispatcher the dispatcher thread to sleep.
* @param timeout the timeout in CLOCK_REALTIME.
*/
inline static void dispatcherSleepUntil(SAsyncBulkWriteDispatcher* dispatcher, struct timespec* timeout) {
pthread_mutex_lock(&dispatcher->sleepMutex);
inline static void timeoutManagerSleepUntil(SDispatcherTimeoutManager* manager, struct timespec* timeout) {
pthread_mutex_lock(&manager->sleepMutex);
while (true) {
// notified by dispatcherShutdown(...).
if (atomic_load_8(&dispatcher->shutdown)) {
if (isShutdownSDispatcherTimeoutManager(manager)) {
break;
}
if (pthread_cond_timedwait(&dispatcher->timeout, &dispatcher->sleepMutex, timeout)) {
if (pthread_cond_timedwait(&manager->timeout, &manager->sleepMutex, timeout)) {
fflush(stdout);
break;
}
}
pthread_mutex_unlock(&dispatcher->sleepMutex);
pthread_mutex_unlock(&manager->sleepMutex);
}
/**
* The thread to manage batching timeout.
*/
static void* dispatcherTimeoutCallback(void* arg) {
SAsyncBulkWriteDispatcher* dispatcher = arg;
static void* timeoutManagerCallback(void* arg) {
SDispatcherTimeoutManager* manager = arg;
setThreadName("tscAsyncBackground");
while (!atomic_load_8(&dispatcher->shutdown)) {
while (!isShutdownSDispatcherTimeoutManager(manager)) {
struct timespec timeout;
clock_gettime(CLOCK_REALTIME, &timeout);
afterMillis(&timeout, dispatcher->timeoutMs);
afterMillis(&timeout, manager->timeoutMs);
SArray* statements = dispatcherLockPollAll(dispatcher);
dispatcherExecute(statements);
taosArrayDestroy(&statements);
size_t nPolls = 0;
SSqlObj** polls = dispatcherLockPollAll(manager->dispatcher, &nPolls);
if (polls) {
dispatcherExecute(polls, nPolls);
free(polls);
}
// Similar to scheduleAtFixedRate in Java, if the execution time exceed
// `timeoutMs` milliseconds, then there will be no sleep.
dispatcherSleepUntil(dispatcher, &timeout);
timeoutManagerSleepUntil(manager, &timeout);
}
return NULL;
}
SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int32_t timeoutMs) {
SAsyncBulkWriteDispatcher* dispatcher = calloc(1, sizeof(SAsyncBulkWriteDispatcher));
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(int32_t batchSize, int32_t timeoutMs) {
SAsyncBatchWriteDispatcher* dispatcher = calloc(1, sizeof(SAsyncBatchWriteDispatcher) + batchSize * sizeof(SSqlObj*));
if (!dispatcher) {
return NULL;
}
dispatcher->currentSize = 0;
dispatcher->bufferSize = 0;
dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs;
atomic_store_8(&dispatcher->shutdown, false);
// init the buffer.
dispatcher->buffer = taosArrayInit(batchSize, sizeof(SSqlObj*));
if (!dispatcher->buffer) {
tfree(dispatcher);
return NULL;
}
// init the mutex and the cond.
pthread_mutex_init(&dispatcher->bufferMutex, NULL);
pthread_mutex_init(&dispatcher->sleepMutex, NULL);
pthread_cond_init(&dispatcher->timeout, NULL);
pthread_cond_init(&dispatcher->notFull, NULL);
// init background thread.
if (pthread_create(&dispatcher->background, NULL, dispatcherTimeoutCallback, dispatcher)) {
// init timeout manager.
dispatcher->timeoutManager = createSDispatcherTimeoutManager(dispatcher, timeoutMs);
if (!dispatcher->timeoutManager) {
pthread_mutex_destroy(&dispatcher->bufferMutex);
pthread_mutex_destroy(&dispatcher->sleepMutex);
pthread_cond_destroy(&dispatcher->timeout);
pthread_cond_destroy(&dispatcher->notFull);
taosArrayDestroy(&dispatcher->buffer);
tfree(dispatcher);
free(dispatcher);
return NULL;
}
......@@ -359,18 +360,14 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int
*
* @param dispatcher the dispatcher.
*/
inline static void dispatcherShutdown(SAsyncBulkWriteDispatcher* dispatcher) {
// mark shutdown, signal shutdown to timeout thread.
pthread_mutex_lock(&dispatcher->sleepMutex);
inline static void dispatcherShutdown(SAsyncBatchWriteDispatcher* dispatcher) {
atomic_store_8(&dispatcher->shutdown, true);
pthread_cond_broadcast(&dispatcher->timeout);
pthread_mutex_unlock(&dispatcher->sleepMutex);
// make sure the timeout thread exit.
pthread_join(dispatcher->background, NULL);
if (dispatcher->timeoutManager) {
shutdownSDispatcherTimeoutManager(dispatcher->timeoutManager);
}
}
void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher) {
if (dispatcher == NULL) {
return;
}
......@@ -379,28 +376,25 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
// poll and send all the statements in the buffer.
while (true) {
SArray* statements = dispatcherLockPollAll(dispatcher);
if (!statements) {
size_t nPolls = 0;
SSqlObj** polls = dispatcherLockPollAll(dispatcher, &nPolls);
if (!polls) {
break ;
}
dispatcherExecute(statements);
taosArrayDestroy(&statements);
dispatcherExecute(polls, nPolls);
free(polls);
}
// destroy the buffer.
taosArrayDestroy(&dispatcher->buffer);
// destroy the timeout manager.
destroySDispatcherTimeoutManager(dispatcher->timeoutManager);
// destroy the mutex.
pthread_mutex_destroy(&dispatcher->bufferMutex);
pthread_mutex_destroy(&dispatcher->sleepMutex);
pthread_cond_destroy(&dispatcher->timeout);
pthread_cond_destroy(&dispatcher->notFull);
free(dispatcher);
}
bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) {
if (pSql == NULL || !pSql->enableBatch) {
return false;
}
......@@ -438,13 +432,13 @@ bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSq
return true;
}
bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) {
if (atomic_load_8(&dispatcher->shutdown)) {
return false;
}
// the sql object doesn't support bulk insertion.
if (!tscSupportBulkInsertion(dispatcher, pSql)) {
if (!dispatcherCanDispatch(dispatcher, pSql)) {
return false;
}
......@@ -453,20 +447,20 @@ bool dispatcherTryDispatch(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql)
}
/**
* Destroy the SAsyncBulkWriteDispatcher create by SDispatcherHolder.
* @param arg the thread local SAsyncBulkWriteDispatcher.
* Destroy the SAsyncBatchWriteDispatcher create by SDispatcherManager.
* @param arg the thread local SAsyncBatchWriteDispatcher.
*/
static void destroyDispatcher(void* arg) {
SAsyncBulkWriteDispatcher* dispatcher = arg;
SAsyncBatchWriteDispatcher* dispatcher = arg;
if (!dispatcher) {
return;
}
destroyAsyncDispatcher(dispatcher);
destroySAsyncBatchWriteDispatcher(dispatcher);
}
SDispatcherHolder* createDispatcherHolder(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) {
SDispatcherHolder* dispatcher = calloc(1, sizeof(SDispatcherHolder));
SDispatcherManager* createDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) {
SDispatcherManager* dispatcher = calloc(1, sizeof(SDispatcherManager));
if (!dispatcher) {
return NULL;
}
......@@ -481,7 +475,7 @@ SDispatcherHolder* createDispatcherHolder(int32_t batchSize, int32_t timeoutMs,
return NULL;
}
} else {
dispatcher->global = createAsyncBulkWriteDispatcher(batchSize, timeoutMs);
dispatcher->global = createSAsyncBatchWriteDispatcher(batchSize, timeoutMs);
if (!dispatcher->global) {
free(dispatcher);
return NULL;
......@@ -490,32 +484,85 @@ SDispatcherHolder* createDispatcherHolder(int32_t batchSize, int32_t timeoutMs,
return dispatcher;
}
SAsyncBulkWriteDispatcher* dispatcherAcquire(SDispatcherHolder* holder) {
if (!holder->isThreadLocal) {
return holder->global;
SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager) {
if (!manager->isThreadLocal) {
return manager->global;
}
SAsyncBulkWriteDispatcher* value = pthread_getspecific(holder->key);
SAsyncBatchWriteDispatcher* value = pthread_getspecific(manager->key);
if (value) {
return value;
}
value = createAsyncBulkWriteDispatcher(holder->batchSize, holder->timeoutMs);
value = createSAsyncBatchWriteDispatcher(manager->batchSize, manager->timeoutMs);
if (value) {
pthread_setspecific(holder->key, value);
pthread_setspecific(manager->key, value);
return value;
}
return NULL;
}
void destroyDispatcherHolder(SDispatcherHolder* holder) {
if (holder) {
if (holder->isThreadLocal) {
pthread_key_delete(holder->key);
void destroyDispatcherManager(SDispatcherManager* manager) {
if (manager) {
if (manager->isThreadLocal) {
pthread_key_delete(manager->key);
} else {
destroyAsyncDispatcher(holder->global);
destroySAsyncBatchWriteDispatcher(manager->global);
}
free(manager);
}
}
SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispatcher* dispatcher, int32_t timeoutMs) {
SDispatcherTimeoutManager* manager = calloc(1, sizeof(SDispatcherTimeoutManager));
if (!manager) {
return NULL;
}
manager->timeoutMs = timeoutMs;
manager->dispatcher = dispatcher;
atomic_store_8(&manager->shutdown, false);
pthread_mutex_init(&manager->sleepMutex, NULL);
pthread_cond_init(&manager->timeout, NULL);
// init background thread.
if (pthread_create(&manager->background, NULL, timeoutManagerCallback, manager)) {
pthread_mutex_destroy(&manager->sleepMutex);
pthread_cond_destroy(&manager->timeout);
free(manager);
return NULL;
}
free(holder);
return manager;
}
void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
if (!manager) {
return;
}
shutdownSDispatcherTimeoutManager(manager);
manager->dispatcher->timeoutManager = NULL;
pthread_mutex_destroy(&manager->sleepMutex);
pthread_cond_destroy(&manager->timeout);
free(manager);
}
void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
// mark shutdown, signal shutdown to timeout thread.
pthread_mutex_lock(&manager->sleepMutex);
atomic_store_8(&manager->shutdown, true);
pthread_cond_broadcast(&manager->timeout);
pthread_mutex_unlock(&manager->sleepMutex);
// make sure the timeout thread exit.
pthread_join(manager->background, NULL);
}
bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
if (!manager) {
return true;
}
return atomic_load_8(&manager->shutdown);
}
......@@ -14,19 +14,19 @@
*/
#include "os.h"
#include "qScript.h"
#include "taosmsg.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h"
#include "trpc.h"
#include "tnote.h"
#include "ttimer.h"
#include "tsched.h"
#include "tscBatchWrite.h"
#include "tscLog.h"
#include "tsched.h"
#include "tsclient.h"
#include "tglobal.h"
#include "tconfig.h"
#include "ttimer.h"
#include "ttimezone.h"
#include "qScript.h"
#include "tscBulkWrite.h"
// global, not configurable
#define TSC_VAR_NOT_RELEASE 1
......@@ -50,7 +50,7 @@ void *tscRpcCache; // cache to keep rpc obj
int32_t tscNumOfThreads = 1; // num of rpc threads
char tscLogFileName[] = "taoslog";
int tscLogFileNum = 10;
SDispatcherHolder * tscDispatcher = NULL;
SDispatcherManager *tscDispatcherManager = NULL;
static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
......@@ -59,16 +59,16 @@ static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER;
// pthread_once can not return result code, so result code is set to a global variable.
static volatile int tscInitRes = 0;
void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) {
void tscInitDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) {
if (tsAsyncBatchEnable) {
tscDispatcher = createDispatcherHolder(batchSize, timeoutMs, isThreadLocal);
tscDispatcherManager = createDispatcherManager(batchSize, timeoutMs, isThreadLocal);
}
}
void tscDestroyAsyncDispatcher() {
if (tscDispatcher) {
destroyDispatcherHolder(tscDispatcher);
tscDispatcher = NULL;
void tscDestroyDispatcherManager() {
if (tscDispatcherManager) {
destroyDispatcherManager(tscDispatcherManager);
tscDispatcherManager = NULL;
}
}
......@@ -231,7 +231,7 @@ void taos_init_imp(void) {
tscDebug("starting to initialize client ...");
tscDebug("Local End Point is:%s", tsLocalEp);
tscInitAsyncDispatcher(tsAsyncBatchSize, tsAsyncBatchTimeout, tsAsyncBatchThreadLocal);
tscInitDispatcherManager(tsAsyncBatchSize, tsAsyncBatchTimeout, tsAsyncBatchThreadLocal);
}
taosSetCoreDump();
......@@ -297,7 +297,7 @@ void taos_cleanup(void) {
scriptEnvPoolCleanup();
#endif
if (tsAsyncBatchEnable) {
tscDestroyAsyncDispatcher();
tscDestroyDispatcherManager();
}
}
......
......@@ -1885,7 +1885,7 @@ static void doInitGlobalConfig(void) {
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 1;
cfg.maxValue = 65535;
cfg.maxValue = 4096;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册