提交 0d54c5a3 编写于 作者: Z zhihaop

feat: sql objects merging and sending are asynchronous

上级 e2a029ca
......@@ -36,7 +36,7 @@ typedef struct SDispatcherTimeoutManager SDispatcherTimeoutManager;
typedef struct SAsyncBatchWriteDispatcher {
// the client object.
STscObj* pClient;
// the timeout manager.
SDispatcherTimeoutManager* timeoutManager;
......@@ -80,7 +80,7 @@ typedef struct SDispatcherManager {
// the global dispatcher, if thread local enabled, global will be set to NULL.
SAsyncBatchWriteDispatcher* pGlobal;
// the client object.
STscObj* pClient;
......@@ -109,6 +109,14 @@ typedef struct SDispatcherTimeoutManager {
volatile bool shutdown;
} SDispatcherTimeoutManager;
/**
* A batch that polls from SAsyncBatchWriteDispatcher::buffer.
*/
typedef struct SBatchRequest {
size_t nRequests;
SSqlObj* pRequests[];
} SBatchRequest;
/**
* Create the dispatcher timeout manager.
*/
......@@ -135,20 +143,25 @@ void shutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager);
/**
* Merge SSqlObjs into single SSqlObj.
*
* @param polls the array of SSqlObj*.
* @param nPolls the number of SSqlObj* in the array.
* @param batch the merged SSqlObj*.
* @return the merged SSqlObj.
* @param pRequest the batch request.
* @param batch the batch SSqlObj*.
* @return the status code.
*/
int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch);
int32_t dispatcherBatchBuilder(SBatchRequest* pRequest, SSqlObj** batch);
/**
* Merge the sql statements and execute the merged sql statement asynchronously.
*
* @param pRequest the batch request. the request will be promised to free after calling this function.
*/
void dispatcherAsyncExecute(SBatchRequest* pRequest);
/**
* Merge the sql statements and execute the merged sql statement.
*
* @param polls the array of SSqlObj*.
* @param nPolls the number of SSqlObj* in the array.
* @param pRequest the batch request. you must call free(pRequest) after calling this function.
*/
void dispatcherExecute(SSqlObj** polls, size_t nPolls);
void dispatcherExecute(SBatchRequest* pRequest);
/**
* Create the async batch write dispatcher.
......@@ -190,7 +203,7 @@ bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql
/**
* Create the manager of SAsyncBatchWriteDispatcher.
*
*
* @param pClient the client object.
* @param batchSize the batchSize of SAsyncBatchWriteDispatcher.
* @param timeoutMs the timeoutMs of SAsyncBatchWriteDispatcher.
......
......@@ -17,9 +17,9 @@
#include "tscBatchMerge.h"
#include "tscBatchWrite.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tsclient.h"
#include "tscLog.h"
/**
* Represents the callback function and its context.
......@@ -57,9 +57,7 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) {
}
pSql->res.code = code;
if (pSql->fp) {
pSql->fp(pSql->param, pSql, code);
}
tscAsyncResultOnError(pSql);
}
/**
......@@ -70,8 +68,8 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) {
* @param code the error code.
*/
static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
SBatchCallbackContext* context = param;
SSqlObj* res = tres;
SBatchCallbackContext* context = param;
SSqlObj* res = tres;
// handle corner case [context == null].
if (context == NULL) {
......@@ -104,47 +102,47 @@ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
free(context);
}
int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch) {
if (!polls || !nPolls) {
return TSDB_CODE_SUCCESS;
}
int32_t dispatcherBatchBuilder(SBatchRequest* pRequest, SSqlObj** batch) {
assert(pRequest);
assert(pRequest->pRequests);
assert(pRequest->nRequests);
// create the callback context.
SBatchCallbackContext* context = calloc(1, sizeof(SBatchCallbackContext) + nPolls * sizeof(SCallbackHandler));
SBatchCallbackContext* context =
calloc(1, sizeof(SBatchCallbackContext) + pRequest->nRequests * sizeof(SCallbackHandler));
if (context == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("create batch call back context: %p", context);
// initialize the callback context.
context->nHandlers = nPolls;
for (size_t i = 0; i < nPolls; ++i) {
SSqlObj* pSql = polls[i];
context->nHandlers = pRequest->nRequests;
for (size_t i = 0; i < pRequest->nRequests; ++i) {
SSqlObj* pSql = pRequest->pRequests[i];
context->handler[i].fp = pSql->fp;
context->handler[i].param = pSql->param;
}
// merge the statements into single one.
tscDebug("start to merge %zu sql objs", nPolls);
SSqlObj *pFirst = polls[0];
int32_t code = tscMergeSSqlObjs(polls, nPolls, pFirst);
tscDebug("start to merge %zu sql objs", pRequest->nRequests);
SSqlObj* pFirst = pRequest->pRequests[0];
int32_t code = tscMergeSSqlObjs(pRequest->pRequests, pRequest->nRequests, pFirst);
if (code != TSDB_CODE_SUCCESS) {
const char* msg = tstrerror(code);
tscDebug("failed to merge sql objects: %s", msg);
free(context);
taosReleaseRef(tscObjRef, pFirst->self);
return code;
}
pFirst->fp = batchResultCallback;
pFirst->param = context;
pFirst->fetchFp = pFirst->fp;
taosAcquireRef(tscObjRef, pFirst->self);
*batch = pFirst;
for (int i = 1; i < nPolls; ++i) {
SSqlObj *pSql = polls[i];
for (int i = 0; i < pRequest->nRequests; ++i) {
SSqlObj* pSql = pRequest->pRequests[i];
taosReleaseRef(tscObjRef, pSql->self);
}
return code;
......@@ -158,40 +156,37 @@ int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch)
* @param nPolls the number of polled SSqlObj*.
* @return all the SSqlObj* in the buffer.
*/
inline static SSqlObj** dispatcherPollAll(SAsyncBatchWriteDispatcher* dispatcher, size_t* nPolls) {
inline static SBatchRequest* dispatcherPollAll(SAsyncBatchWriteDispatcher* dispatcher) {
if (!dispatcher->bufferSize) {
*nPolls = 0;
return NULL;
}
SSqlObj** clone = malloc(sizeof(SSqlObj*) * dispatcher->bufferSize);
if (clone == NULL) {
SBatchRequest* pRequest = malloc(sizeof(SBatchRequest) + sizeof(SSqlObj*) * dispatcher->bufferSize);
if (pRequest == NULL) {
tscError("failed to poll all items: out of memory");
*nPolls = 0;
return NULL;
}
memcpy(clone, dispatcher->buffer, sizeof(SSqlObj*) * dispatcher->bufferSize);
*nPolls = dispatcher->bufferSize;
memcpy(pRequest->pRequests, dispatcher->buffer, sizeof(SSqlObj*) * dispatcher->bufferSize);
pRequest->nRequests = dispatcher->bufferSize;
dispatcher->currentSize = 0;
dispatcher->bufferSize = 0;
return clone;
return pRequest;
}
/**
* Poll all the SSqlObj* in the dispatcher's buffer.
*
* @param dispatcher the dispatcher.
* @param nPolls the number of polled SSqlObj*.
* @return all the SSqlObj* in the buffer.
*/
inline static SSqlObj** dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispatcher, size_t* nPolls) {
SSqlObj** polls = NULL;
inline static SBatchRequest* dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispatcher) {
SBatchRequest* pRequest = NULL;
pthread_mutex_lock(&dispatcher->bufferMutex);
polls = dispatcherPollAll(dispatcher, nPolls);
pRequest = dispatcherPollAll(dispatcher);
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->bufferMutex);
return polls;
return pRequest;
}
/**
......@@ -203,14 +198,14 @@ inline static SSqlObj** dispatcherLockPollAll(SAsyncBatchWriteDispatcher* dispat
*/
inline static bool dispatcherTryOffer(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql) {
pthread_mutex_lock(&dispatcher->bufferMutex);
// if dispatcher is shutdown, must fail back to normal insertion.
// usually not happen, unless taos_query_a(...) after taos_close(...).
if (atomic_load_8(&dispatcher->shutdown)) {
pthread_mutex_unlock(&dispatcher->bufferMutex);
return false;
}
// the buffer is full.
while (dispatcher->currentSize >= dispatcher->batchSize) {
if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->bufferMutex)) {
......@@ -222,48 +217,49 @@ inline static bool dispatcherTryOffer(SAsyncBatchWriteDispatcher* dispatcher, SS
dispatcher->buffer[dispatcher->bufferSize++] = pSql;
dispatcher->currentSize += statementGetInsertionRows(pSql);
tscDebug("sql obj %p has been write to insert buffer", pSql);
if (dispatcher->currentSize < dispatcher->batchSize) {
pthread_mutex_unlock(&dispatcher->bufferMutex);
return true;
}
// the dispatcher reaches batch size.
size_t nPolls = 0;
SSqlObj** polls = dispatcherPollAll(dispatcher, &nPolls);
SBatchRequest* pRequest = dispatcherPollAll(dispatcher);
pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->bufferMutex);
if (polls) {
dispatcherExecute(polls, nPolls);
free(polls);
if (pRequest) {
dispatcherAsyncExecute(pRequest);
}
return true;
}
void dispatcherExecute(SSqlObj** polls, size_t nPolls) {
void dispatcherExecute(SBatchRequest* pRequest) {
int32_t code = TSDB_CODE_SUCCESS;
// no item in the buffer (items has been taken by other threads).
if (!polls || !nPolls) {
if (!pRequest) {
return;
}
assert(pRequest->pRequests);
assert(pRequest->nRequests);
// merge the statements into single one.
SSqlObj* merged = NULL;
code = dispatcherBatchBuilder(polls, nPolls, &merged);
SSqlObj* pSql = NULL;
code = dispatcherBatchBuilder(pRequest, &pSql);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscDebug("merging %zu sql objs into %p", nPolls, merged);
tscHandleMultivnodeInsert(merged);
tscDebug("merging %zu sql objs into %p", pRequest->nRequests, pSql);
tscHandleMultivnodeInsert(pSql);
return;
_error:
tscError("send async batch sql obj failed, reason: %s", tstrerror(code));
// handling the failures.
for (size_t i = 0; i < nPolls; ++i) {
SSqlObj* item = polls[i];
for (size_t i = 0; i < pRequest->nRequests; ++i) {
SSqlObj* item = pRequest->pRequests[i];
tscReturnsError(item, code);
}
}
......@@ -275,7 +271,7 @@ _error:
* @param millis the duration in milliseconds.
* @return the timespec after `millis` ms.
*/
static inline void afterMillis(struct timespec *t, int32_t millis) {
static inline void afterMillis(struct timespec* t, int32_t millis) {
t->tv_nsec += millis * 1000000L;
t->tv_sec += t->tv_nsec / 1000000000L;
t->tv_nsec %= 1000000000L;
......@@ -283,7 +279,7 @@ static inline void afterMillis(struct timespec *t, int32_t millis) {
/**
* Sleep until `timeout` timespec. When dispatcherShutdown(...) called, the function will return immediately.
*
*
* @param dispatcher the dispatcher thread to sleep.
* @param timeout the timeout in CLOCK_REALTIME.
*/
......@@ -313,15 +309,12 @@ static void* timeoutManagerCallback(void* arg) {
struct timespec timeout;
clock_gettime(CLOCK_REALTIME, &timeout);
afterMillis(&timeout, manager->timeoutMs);
size_t nPolls = 0;
SSqlObj** polls = dispatcherLockPollAll(manager->dispatcher, &nPolls);
if (polls) {
dispatcherExecute(polls, nPolls);
free(polls);
SBatchRequest* pRequest = dispatcherLockPollAll(manager->dispatcher);
if (pRequest) {
dispatcherAsyncExecute(pRequest);
}
// Similar to scheduleAtFixedRate in Java, if the execution time exceed
// `timeoutMs` milliseconds, then there will be no sleep.
timeoutManagerSleepUntil(manager, &timeout);
......@@ -336,13 +329,13 @@ SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, i
}
assert(pClient != NULL);
dispatcher->pClient = pClient;
dispatcher->currentSize = 0;
dispatcher->bufferSize = 0;
dispatcher->batchSize = batchSize;
atomic_store_8(&dispatcher->shutdown, false);
// init the mutex and the cond.
pthread_mutex_init(&dispatcher->bufferMutex, NULL);
pthread_cond_init(&dispatcher->notFull, NULL);
......@@ -361,7 +354,7 @@ SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, i
/**
* Shutdown the dispatcher and join the timeout thread.
*
*
* @param dispatcher the dispatcher.
*/
inline static void dispatcherShutdown(SAsyncBatchWriteDispatcher* dispatcher) {
......@@ -380,21 +373,20 @@ void destroySAsyncBatchWriteDispatcher(SAsyncBatchWriteDispatcher* dispatcher) {
// poll and send all the statements in the buffer.
while (true) {
size_t nPolls = 0;
SSqlObj** polls = dispatcherLockPollAll(dispatcher, &nPolls);
if (!polls) {
break ;
SBatchRequest* pRequest = dispatcherLockPollAll(dispatcher);
if (!pRequest) {
break;
}
dispatcherExecute(polls, nPolls);
free(polls);
dispatcherExecute(pRequest);
free(pRequest);
}
// destroy the timeout manager.
destroySDispatcherTimeoutManager(dispatcher->timeoutManager);
// destroy the mutex.
pthread_mutex_destroy(&dispatcher->bufferMutex);
pthread_cond_destroy(&dispatcher->notFull);
free(dispatcher);
}
......@@ -422,12 +414,12 @@ bool dispatcherCanDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql
if (pInsertParam->payloadType != PAYLOAD_TYPE_KV) {
return false;
}
// no schema attached.
if (pInsertParam->schemaAttached) {
return false;
}
// too many insertion rows, fail back to normal insertion.
if (statementGetInsertionRows(pSql) >= dispatcher->batchSize) {
return false;
......@@ -463,14 +455,15 @@ static void destroyDispatcher(void* arg) {
destroySAsyncBatchWriteDispatcher(dispatcher);
}
SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) {
SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs,
bool isThreadLocal) {
SDispatcherManager* dispatcher = calloc(1, sizeof(SDispatcherManager));
if (!dispatcher) {
return NULL;
}
assert(pClient != NULL);
dispatcher->pClient = pClient;
dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs;
......@@ -515,7 +508,7 @@ void destroyDispatcherManager(SDispatcherManager* manager) {
if (manager->isThreadLocal) {
pthread_key_delete(manager->key);
}
if (manager->pGlobal) {
destroySAsyncBatchWriteDispatcher(manager->pGlobal);
}
......@@ -528,14 +521,14 @@ SDispatcherTimeoutManager* createSDispatcherTimeoutManager(SAsyncBatchWriteDispa
if (!manager) {
return NULL;
}
manager->timeoutMs = timeoutMs;
manager->dispatcher = dispatcher;
atomic_store_8(&manager->shutdown, false);
pthread_mutex_init(&manager->sleepMutex, NULL);
pthread_cond_init(&manager->timeout, NULL);
// init background thread.
if (pthread_create(&manager->background, NULL, timeoutManagerCallback, manager)) {
pthread_mutex_destroy(&manager->sleepMutex);
......@@ -553,7 +546,7 @@ void destroySDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
shutdownSDispatcherTimeoutManager(manager);
manager->dispatcher->timeoutManager = NULL;
pthread_mutex_destroy(&manager->sleepMutex);
pthread_cond_destroy(&manager->timeout);
free(manager);
......@@ -576,3 +569,35 @@ bool isShutdownSDispatcherTimeoutManager(SDispatcherTimeoutManager* manager) {
}
return atomic_load_8(&manager->shutdown);
}
/**
* The proxy function to call `dispatcherExecute`.
*
* @param pMsg the schedule message.
*/
static void dispatcherExecuteProxy(struct SSchedMsg* pMsg) {
SBatchRequest* pRequest = pMsg->ahandle;
if (!pRequest) {
return;
}
pMsg->ahandle = NULL;
dispatcherExecute(pRequest);
free(pRequest);
}
void dispatcherAsyncExecute(SBatchRequest* pRequest) {
if (!pRequest) {
return;
}
assert(pRequest->pRequests);
assert(pRequest->nRequests);
SSchedMsg schedMsg = {0};
schedMsg.fp = dispatcherExecuteProxy;
schedMsg.ahandle = (void*) pRequest;
schedMsg.thandle = (void*) 1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册