From 0d54c5a323d028e3aa9b1cb8a27313ad0f0903bd Mon Sep 17 00:00:00 2001 From: zhihaop Date: Mon, 26 Sep 2022 23:12:45 +0800 Subject: [PATCH] feat: sql objects merging and sending are asynchronous --- src/client/inc/tscBatchWrite.h | 35 ++++-- src/client/src/tscBatchWrite.c | 199 +++++++++++++++++++-------------- 2 files changed, 136 insertions(+), 98 deletions(-) diff --git a/src/client/inc/tscBatchWrite.h b/src/client/inc/tscBatchWrite.h index c2a96b6ae6..d1907cef3c 100644 --- a/src/client/inc/tscBatchWrite.h +++ b/src/client/inc/tscBatchWrite.h @@ -36,7 +36,7 @@ typedef struct SDispatcherTimeoutManager SDispatcherTimeoutManager; typedef struct SAsyncBatchWriteDispatcher { // the client object. STscObj* pClient; - + // the timeout manager. SDispatcherTimeoutManager* timeoutManager; @@ -80,7 +80,7 @@ typedef struct SDispatcherManager { // the global dispatcher, if thread local enabled, global will be set to NULL. SAsyncBatchWriteDispatcher* pGlobal; - + // the client object. STscObj* pClient; @@ -109,6 +109,14 @@ typedef struct SDispatcherTimeoutManager { volatile bool shutdown; } SDispatcherTimeoutManager; +/** + * A batch that polls from SAsyncBatchWriteDispatcher::buffer. + */ +typedef struct SBatchRequest { + size_t nRequests; + SSqlObj* pRequests[]; +} SBatchRequest; + /** * Create the dispatcher timeout manager. */ @@ -135,20 +143,25 @@ void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager); /** * Merge SSqlObjs into single SSqlObj. * - * @param polls the array of SSqlObj*. - * @param nPolls the number of SSqlObj* in the array. - * @param batch the merged SSqlObj*. - * @return the merged SSqlObj. + * @param pRequest the batch request. + * @param batch the batch SSqlObj*. + * @return the status code. */ -int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch); +int32_t dispatcherBatchBuilder(SBatchRequest* pRequest, SSqlObj** batch); + +/** + * Merge the sql statements and execute the merged sql statement asynchronously. + * + * @param pRequest the batch request. the request will be promised to free after calling this function. + */ +void dispatcherAsyncExecute(SBatchRequest* pRequest); /** * Merge the sql statements and execute the merged sql statement. * - * @param polls the array of SSqlObj*. - * @param nPolls the number of SSqlObj* in the array. + * @param pRequest the batch request. you must call free(pRequest) after calling this function. */ -void dispatcherExecute(SSqlObj** polls, size_t nPolls); +void dispatcherExecute(SBatchRequest* pRequest); /** * Create the async batch write dispatcher. @@ -190,7 +203,7 @@ bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql /** * Create the manager of SAsyncBatchWriteDispatcher. - * + * * @param pClient the client object. * @param batchSize the batchSize of SAsyncBatchWriteDispatcher. * @param timeoutMs the timeoutMs of SAsyncBatchWriteDispatcher. diff --git a/src/client/src/tscBatchWrite.c b/src/client/src/tscBatchWrite.c index 258c913b84..7c1c7b1415 100644 --- a/src/client/src/tscBatchWrite.c +++ b/src/client/src/tscBatchWrite.c @@ -17,9 +17,9 @@ #include "tscBatchMerge.h" #include "tscBatchWrite.h" +#include "tscLog.h" #include "tscSubquery.h" #include "tsclient.h" -#include "tscLog.h" /** * Represents the callback function and its context. @@ -57,9 +57,7 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) { } pSql->res.code = code; - if (pSql->fp) { - pSql->fp(pSql->param, pSql, code); - } + tscAsyncResultOnError(pSql); } /** @@ -70,8 +68,8 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) { * @param code the error code. */ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { - SBatchCallbackContext* context = param; - SSqlObj* res = tres; + SBatchCallbackContext* context = param; + SSqlObj* res = tres; // handle corner case [context == null]. if (context == NULL) { @@ -104,47 +102,47 @@ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { free(context); } -int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch) { - if (!polls || !nPolls) { - return TSDB_CODE_SUCCESS; - } - +int32_t dispatcherBatchBuilder(SBatchRequest* pRequest, SSqlObj** batch) { + assert(pRequest); + assert(pRequest->pRequests); + assert(pRequest->nRequests); + // create the callback context. - SBatchCallbackContext* context = calloc(1, sizeof(SBatchCallbackContext) + nPolls * sizeof(SCallbackHandler)); + SBatchCallbackContext* context = + calloc(1, sizeof(SBatchCallbackContext) + pRequest->nRequests * sizeof(SCallbackHandler)); if (context == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } tscDebug("create batch call back context: %p", context); - // initialize the callback context. - context->nHandlers = nPolls; - for (size_t i = 0; i < nPolls; ++i) { - SSqlObj* pSql = polls[i]; + context->nHandlers = pRequest->nRequests; + for (size_t i = 0; i < pRequest->nRequests; ++i) { + SSqlObj* pSql = pRequest->pRequests[i]; context->handler[i].fp = pSql->fp; context->handler[i].param = pSql->param; } - + // merge the statements into single one. - tscDebug("start to merge %zu sql objs", nPolls); - SSqlObj *pFirst = polls[0]; - int32_t code = tscMergeSSqlObjs(polls, nPolls, pFirst); + tscDebug("start to merge %zu sql objs", pRequest->nRequests); + SSqlObj* pFirst = pRequest->pRequests[0]; + int32_t code = tscMergeSSqlObjs(pRequest->pRequests, pRequest->nRequests, pFirst); if (code != TSDB_CODE_SUCCESS) { const char* msg = tstrerror(code); tscDebug("failed to merge sql objects: %s", msg); free(context); - taosReleaseRef(tscObjRef, pFirst->self); return code; } - + pFirst->fp = batchResultCallback; pFirst->param = context; pFirst->fetchFp = pFirst->fp; + taosAcquireRef(tscObjRef, pFirst->self); *batch = pFirst; - - for (int i = 1; i < nPolls; ++i) { - SSqlObj *pSql = polls[i]; + + for (int i = 0; i < pRequest->nRequests; ++i) { + SSqlObj* pSql = pRequest->pRequests[i]; taosReleaseRef(tscObjRef, pSql->self); } return code; @@ -158,40 +156,37 @@ int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch) * @param nPolls the number of polled SSqlObj*. * @return all the SSqlObj* in the buffer. */ -inline static SSqlObj** dispatcherPollAll(SAsyncBatchWriteDispatcher* dispatcher, size_t* nPolls) { +inline static SBatchRequest* dispatcherPollAll(SAsyncBatchWriteDispatcher* dispatcher) { if (!dispatcher->bufferSize) { - *nPolls = 0; return NULL; } - - SSqlObj** clone = malloc(sizeof(SSqlObj*) * dispatcher->bufferSize); - if (clone == NULL) { + + SBatchRequest* pRequest = malloc(sizeof(SBatchRequest) + sizeof(SSqlObj*) * dispatcher->bufferSize); + if (pRequest == NULL) { tscError("failed to poll all items: out of memory"); - *nPolls = 0; return NULL; } - memcpy(clone, dispatcher->buffer, sizeof(SSqlObj*) * dispatcher->bufferSize); - *nPolls = dispatcher->bufferSize; + memcpy(pRequest->pRequests, dispatcher->buffer, sizeof(SSqlObj*) * dispatcher->bufferSize); + pRequest->nRequests = dispatcher->bufferSize; dispatcher->currentSize = 0; dispatcher->bufferSize = 0; - return clone; + return pRequest; } /** * Poll all the SSqlObj* in the dispatcher's buffer. * * @param dispatcher the dispatcher. - * @param nPolls the number of polled SSqlObj*. * @return all the SSqlObj* in the buffer. */ -inline static SSqlObj** dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispatcher, size_t* nPolls) { - SSqlObj** polls = NULL; +inline static SBatchRequest* dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispatcher) { + SBatchRequest* pRequest = NULL; pthread_mutex_lock(&dispatcher->bufferMutex); - polls = dispatcherPollAll(dispatcher, nPolls); + pRequest = dispatcherPollAll(dispatcher); pthread_cond_broadcast(&dispatcher->notFull); pthread_mutex_unlock(&dispatcher->bufferMutex); - return polls; + return pRequest; } /** @@ -203,14 +198,14 @@ inline static SSqlObj** dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispat */ inline static bool dispatcherTryOffer(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) { pthread_mutex_lock(&dispatcher->bufferMutex); - + // if dispatcher is shutdown, must fail back to normal insertion. // usually not happen, unless taos_query_a(...) after taos_close(...). if (atomic_load_8(&dispatcher->shutdown)) { pthread_mutex_unlock(&dispatcher->bufferMutex); return false; } - + // the buffer is full. while (dispatcher->currentSize >= dispatcher->batchSize) { if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->bufferMutex)) { @@ -222,48 +217,49 @@ inline static bool dispatcherTryOffer(SAsyncBatchWriteDispatcher* dispatcher, SS dispatcher->buffer[dispatcher->bufferSize++] = pSql; dispatcher->currentSize += statementGetInsertionRows(pSql); tscDebug("sql obj %p has been write to insert buffer", pSql); - + if (dispatcher->currentSize < dispatcher->batchSize) { pthread_mutex_unlock(&dispatcher->bufferMutex); return true; } - + // the dispatcher reaches batch size. - size_t nPolls = 0; - SSqlObj** polls = dispatcherPollAll(dispatcher, &nPolls); + SBatchRequest* pRequest = dispatcherPollAll(dispatcher); pthread_cond_broadcast(&dispatcher->notFull); pthread_mutex_unlock(&dispatcher->bufferMutex); - - if (polls) { - dispatcherExecute(polls, nPolls); - free(polls); + + if (pRequest) { + dispatcherAsyncExecute(pRequest); } return true; } -void dispatcherExecute(SSqlObj** polls, size_t nPolls) { +void dispatcherExecute(SBatchRequest* pRequest) { int32_t code = TSDB_CODE_SUCCESS; // no item in the buffer (items has been taken by other threads). - if (!polls || !nPolls) { + if (!pRequest) { return; } + assert(pRequest->pRequests); + assert(pRequest->nRequests); + // merge the statements into single one. - SSqlObj* merged = NULL; - code = dispatcherBatchBuilder(polls, nPolls, &merged); + SSqlObj* pSql = NULL; + code = dispatcherBatchBuilder(pRequest, &pSql); if (code != TSDB_CODE_SUCCESS) { goto _error; } - tscDebug("merging %zu sql objs into %p", nPolls, merged); - tscHandleMultivnodeInsert(merged); + tscDebug("merging %zu sql objs into %p", pRequest->nRequests, pSql); + tscHandleMultivnodeInsert(pSql); return; _error: tscError("send async batch sql obj failed, reason: %s", tstrerror(code)); // handling the failures. - for (size_t i = 0; i < nPolls; ++i) { - SSqlObj* item = polls[i]; + for (size_t i = 0; i < pRequest->nRequests; ++i) { + SSqlObj* item = pRequest->pRequests[i]; tscReturnsError(item, code); } } @@ -275,7 +271,7 @@ _error: * @param millis the duration in milliseconds. * @return the timespec after `millis` ms. */ -static inline void afterMillis(struct timespec *t, int32_t millis) { +static inline void afterMillis(struct timespec* t, int32_t millis) { t->tv_nsec += millis * 1000000L; t->tv_sec += t->tv_nsec / 1000000000L; t->tv_nsec %= 1000000000L; @@ -283,7 +279,7 @@ static inline void afterMillis(struct timespec *t, int32_t millis) { /** * Sleep until `timeout` timespec. When dispatcherShutdown(...) called, the function will return immediately. - * + * * @param dispatcher the dispatcher thread to sleep. * @param timeout the timeout in CLOCK_REALTIME. */ @@ -313,15 +309,12 @@ static void* timeoutManagerCallback(void* arg) { struct timespec timeout; clock_gettime(CLOCK_REALTIME, &timeout); afterMillis(&timeout, manager->timeoutMs); - - size_t nPolls = 0; - SSqlObj** polls = dispatcherLockPollAll(manager->dispatcher, &nPolls); - - if (polls) { - dispatcherExecute(polls, nPolls); - free(polls); + + SBatchRequest* pRequest = dispatcherLockPollAll(manager->dispatcher); + if (pRequest) { + dispatcherAsyncExecute(pRequest); } - + // Similar to scheduleAtFixedRate in Java, if the execution time exceed // `timeoutMs` milliseconds, then there will be no sleep. timeoutManagerSleepUntil(manager, &timeout); @@ -336,13 +329,13 @@ SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, i } assert(pClient != NULL); - + dispatcher->pClient = pClient; dispatcher->currentSize = 0; dispatcher->bufferSize = 0; dispatcher->batchSize = batchSize; atomic_store_8(&dispatcher->shutdown, false); - + // init the mutex and the cond. pthread_mutex_init(&dispatcher->bufferMutex, NULL); pthread_cond_init(&dispatcher->notFull, NULL); @@ -361,7 +354,7 @@ SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, i /** * Shutdown the dispatcher and join the timeout thread. - * + * * @param dispatcher the dispatcher. */ inline static void dispatcherShutdown(SAsyncBatchWriteDispatcher* dispatcher) { @@ -380,21 +373,20 @@ void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher) { // poll and send all the statements in the buffer. while (true) { - size_t nPolls = 0; - SSqlObj** polls = dispatcherLockPollAll(dispatcher, &nPolls); - if (!polls) { - break ; + SBatchRequest* pRequest = dispatcherLockPollAll(dispatcher); + if (!pRequest) { + break; } - dispatcherExecute(polls, nPolls); - free(polls); + dispatcherExecute(pRequest); + free(pRequest); } // destroy the timeout manager. destroySDispatcherTimeoutManager(dispatcher->timeoutManager); - + // destroy the mutex. pthread_mutex_destroy(&dispatcher->bufferMutex); pthread_cond_destroy(&dispatcher->notFull); - + free(dispatcher); } @@ -422,12 +414,12 @@ bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql if (pInsertParam->payloadType != PAYLOAD_TYPE_KV) { return false; } - + // no schema attached. if (pInsertParam->schemaAttached) { return false; } - + // too many insertion rows, fail back to normal insertion. if (statementGetInsertionRows(pSql) >= dispatcher->batchSize) { return false; @@ -463,14 +455,15 @@ static void destroyDispatcher(void* arg) { destroySAsyncBatchWriteDispatcher(dispatcher); } -SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) { +SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs, + bool isThreadLocal) { SDispatcherManager* dispatcher = calloc(1, sizeof(SDispatcherManager)); if (!dispatcher) { return NULL; } assert(pClient != NULL); - + dispatcher->pClient = pClient; dispatcher->batchSize = batchSize; dispatcher->timeoutMs = timeoutMs; @@ -515,7 +508,7 @@ void destroyDispatcherManager(SDispatcherManager* manager) { if (manager->isThreadLocal) { pthread_key_delete(manager->key); } - + if (manager->pGlobal) { destroySAsyncBatchWriteDispatcher(manager->pGlobal); } @@ -528,14 +521,14 @@ SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispa if (!manager) { return NULL; } - + manager->timeoutMs = timeoutMs; manager->dispatcher = dispatcher; atomic_store_8(&manager->shutdown, false); - + pthread_mutex_init(&manager->sleepMutex, NULL); pthread_cond_init(&manager->timeout, NULL); - + // init background thread. if (pthread_create(&manager->background, NULL, timeoutManagerCallback, manager)) { pthread_mutex_destroy(&manager->sleepMutex); @@ -553,7 +546,7 @@ void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) { shutdownSDispatcherTimeoutManager(manager); manager->dispatcher->timeoutManager = NULL; - + pthread_mutex_destroy(&manager->sleepMutex); pthread_cond_destroy(&manager->timeout); free(manager); @@ -576,3 +569,35 @@ bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) { } return atomic_load_8(&manager->shutdown); } + +/** + * The proxy function to call `dispatcherExecute`. + * + * @param pMsg the schedule message. + */ +static void dispatcherExecuteProxy(struct SSchedMsg* pMsg) { + SBatchRequest* pRequest = pMsg->ahandle; + if (!pRequest) { + return; + } + + pMsg->ahandle = NULL; + dispatcherExecute(pRequest); + free(pRequest); +} + +void dispatcherAsyncExecute(SBatchRequest* pRequest) { + if (!pRequest) { + return; + } + + assert(pRequest->pRequests); + assert(pRequest->nRequests); + + SSchedMsg schedMsg = {0}; + schedMsg.fp = dispatcherExecuteProxy; + schedMsg.ahandle = (void*) pRequest; + schedMsg.thandle = (void*) 1; + schedMsg.msg = 0; + taosScheduleTask(tscQhandle, &schedMsg); +} -- GitLab