提交 010dea24 编写于 作者: Z zhihaop

feat: using thread local buffer to improve performance at scale

上级 f6de716d
......@@ -16,23 +16,29 @@
#ifndef TDENGINE_TSCBULKWRITE_H
#define TDENGINE_TSCBULKWRITE_H
#include <pthread.h>
#include <stdlib.h>
#ifdef __cplusplus
extern "C" {
#endif
#include "tlist.h"
#include "tarray.h"
#include "tlist.h"
#include "tthread.h"
/**
* SAsyncBulkWriteDispatcher is an async batching dispatcher(for writing), it can buffer insertion statements, batch
* and merge them into single statement.
*/
typedef struct SAsyncBulkWriteDispatcher {
// the mpmc queue to store the insertion statements. equivalent to SList<SSqlObj*>.
// the buffer to store the insertion statements. equivalent to SList<SSqlObj*>.
SList* buffer;
// the mutex to protect the buffer.
pthread_mutex_t mutex;
// the background thread to manage batching timeout.
pthread_t* background;
pthread_t background;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
......@@ -55,6 +61,7 @@ typedef struct SAsyncBulkWriteDispatcher {
// forward declaration.
typedef struct SSqlObj SSqlObj;
/**
* Merge the statements into single SSqlObj.
*
......@@ -125,6 +132,44 @@ bool tscSupportBulkInsertion(SSqlObj* pSql);
*/
bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql);
/**
* A thread local version of SAsyncBulkWriteDispatcher.
*/
typedef struct SThreadLocalDispatcher {
pthread_key_t key;
// the maximum number of insertion rows in a batch.
int32_t batchSize;
// the batching timeout in milliseconds.
int32_t timeoutMs;
} SThreadLocalDispatcher;
/**
* Create a thread local SAsyncBulkWriteDispatcher variable.
*
* @param batchSize the batchSize of SAsyncBulkWriteDispatcher.
* @param timeoutMs the timeoutMs of SAsyncBulkWriteDispatcher.
* @return the thread local SAsyncBulkWriteDispatcher.
*/
SThreadLocalDispatcher* createThreadLocalDispatcher(int32_t batchSize, int32_t timeoutMs);
/**
* Destroy the thread local SAsyncBulkWriteDispatcher variable.
* (will destroy all the instances of SAsyncBulkWriteDispatcher in the thread local variable)
*
* @param dispatcher the thread local SAsyncBulkWriteDispatcher variable.
*/
void destroyThreadLocalDispatcher(SThreadLocalDispatcher* dispatcher);
/**
* Get the thread local instance of SAsyncBulkWriteDispatcher.
* @param dispatcher the thread local SAsyncBulkWriteDispatcher variable.
* @return the thread local SAsyncBulkWriteDispatcher.
*/
SAsyncBulkWriteDispatcher* dispatcherThreadLocal(SThreadLocalDispatcher* dispatcher);
#ifdef __cplusplus
}
#endif
......
......@@ -546,8 +546,8 @@ extern SHashObj *tscTableMetaMap;
extern SCacheObj *tscVgroupListBuf;
// forward declaration.
typedef struct SAsyncBulkWriteDispatcher SAsyncBulkWriteDispatcher;
extern SAsyncBulkWriteDispatcher* tscDispatcher;
typedef struct SThreadLocalDispatcher SThreadLocalDispatcher;
extern SThreadLocalDispatcher *tscDispatcher;
extern int tscObjRef;
extern void *tscTmr;
extern void *tscQhandle;
......
......@@ -397,11 +397,14 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
if (tscDispatcher != NULL && dispatcherTryBatching(tscDispatcher, pSql)) {
if (tscDispatcher != NULL) {
SAsyncBulkWriteDispatcher* dispatcher = dispatcherThreadLocal(tscDispatcher);
if (dispatcherTryBatching(dispatcher, pSql)) {
taosReleaseRef(tscObjRef, pSql->self);
tscDebug("sql obj %p has been buffer in insert buffer", pSql);
return;
}
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo);
......
......@@ -151,8 +151,7 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
}
pthread_mutex_lock(&dispatcher->mutex);
SArray* statements = taosArrayInit(atomic_load_32(&dispatcher->bufferSize), sizeof(SSqlObj*));
SArray* statements = taosArrayInit(0, sizeof(SSqlObj*));
if (statements == NULL) {
pthread_mutex_unlock(&dispatcher->mutex);
tscError("failed to poll all items: out of memory");
......@@ -160,7 +159,7 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
}
// get all the sql statements from the buffer.
while (atomic_load_32(&dispatcher->bufferSize)) {
while (true) {
SListNode* node = tdListPopHead(dispatcher->buffer);
if (!node) {
break;
......@@ -172,7 +171,6 @@ SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
listNodeFree(node);
atomic_fetch_sub_32(&dispatcher->bufferSize, 1);
atomic_fetch_sub_32(&dispatcher->currentSize, statementGetInsertionRows(item));
taosArrayPush(statements, &item);
}
......@@ -285,8 +283,7 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int
pthread_mutex_init(&dispatcher->mutex, NULL);
// init background thread.
dispatcher->background = taosCreateThread(dispatcherTimeoutCallback, dispatcher);
if (!dispatcher->background) {
if (pthread_create(&dispatcher->background, NULL, dispatcherTimeoutCallback, dispatcher)) {
tdListFree(dispatcher->buffer);
tfree(dispatcher);
return NULL;
......@@ -300,17 +297,18 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
return;
}
// mark shutdown.
atomic_store_8(&dispatcher->shutdown, true);
// make sure the timeout thread exit.
pthread_join(dispatcher->background, NULL);
// poll and send all the statements in the buffer.
while (atomic_load_32(&dispatcher->bufferSize)) {
SArray* statements = dispatcherPollAll(dispatcher);
dispatcherExecute(statements);
}
// make sure the thread exit.
taosDestroyThread(dispatcher->background);
// destroy the buffer.
tdListFree(dispatcher->buffer);
......@@ -375,3 +373,54 @@ bool dispatcherTryBatching(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql)
}
return true;
}
/**
* Destroy the SAsyncBulkWriteDispatcher create by SThreadLocalDispatcher.
* @param arg
*/
static void destroyDispatcher(void* arg) {
SAsyncBulkWriteDispatcher* dispatcher = arg;
if (!dispatcher) {
return;
}
destroyAsyncDispatcher(dispatcher);
}
SThreadLocalDispatcher* createThreadLocalDispatcher(int32_t batchSize, int32_t timeoutMs) {
SThreadLocalDispatcher* dispatcher = calloc(1, sizeof(SThreadLocalDispatcher));
if (!dispatcher) {
return NULL;
}
dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs;
if (pthread_key_create(&dispatcher->key, destroyDispatcher)) {
free(dispatcher);
return NULL;
}
return dispatcher;
}
SAsyncBulkWriteDispatcher* dispatcherThreadLocal(SThreadLocalDispatcher* dispatcher) {
SAsyncBulkWriteDispatcher* value = pthread_getspecific(dispatcher->key);
if (value) {
return value;
}
value = createAsyncBulkWriteDispatcher(dispatcher->batchSize, dispatcher->timeoutMs);
if (value) {
pthread_setspecific(dispatcher->key, value);
return value;
}
return NULL;
}
void destroyThreadLocalDispatcher(SThreadLocalDispatcher* dispatcher) {
if (dispatcher) {
pthread_key_delete(dispatcher->key);
free(dispatcher);
}
}
......@@ -50,7 +50,7 @@ void *tscRpcCache; // cache to keep rpc obj
int32_t tscNumOfThreads = 1; // num of rpc threads
char tscLogFileName[] = "taoslog";
int tscLogFileNum = 10;
SAsyncBulkWriteDispatcher* tscDispatcher = NULL;
SThreadLocalDispatcher * tscDispatcher = NULL;
static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
......@@ -60,20 +60,20 @@ static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER;
static volatile int tscInitRes = 0;
/**
* Init the taosc async bulk write dispatcher.
* Init the thread local async bulk write dispatcher.
*
* @param batchSize the batchSize of async bulk write dispatcher.
* @param timeoutMs the timeout of batching in milliseconds.
*/
void tscInitAsyncDispatcher(int32_t batchSize, int32_t timeoutMs) {
tscDispatcher = createAsyncBulkWriteDispatcher(batchSize, timeoutMs);
tscDispatcher = createThreadLocalDispatcher(batchSize, timeoutMs);
}
/**
* Destroy the taosc async bulk write dispatcher.
* Destroy the thread local async bulk write dispatcher.
*/
void tscDestroyAsyncDispatcher() {
destroyAsyncDispatcher(tscDispatcher);
destroyThreadLocalDispatcher(tscDispatcher);
tscDispatcher = NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册