提交 30f63bea 编写于 作者: Z zhihaop

fix & feat: (fix) memory leaks on global dispatcher, (feat) config abwd using...

fix & feat: (fix) memory leaks on global dispatcher, (feat) config abwd using taos_option and taos.cfg
上级 b0e06019
......@@ -311,14 +311,11 @@ keepColumnName 1
# unit Hour. Latency of data migration
# keepTimeOffset 0
# enable taosc async batching insertion
# asyncBatchEnable 1
# enable thread local write batching.
# writeBatchThreadLocal 1
# enable thread local batch dispatcher
# asyncBatchThreadLocal 1
# taosc write batch size, maximum 4096, 0 means disabled write batching.
# writeBatchSize 96
# taosc async insertion batch size, maximum 4096
# asyncBatchSize 256
# taosc async batching timeout in milliseconds, maximum 2048
# asyncBatchTimeout 5
\ No newline at end of file
# taosc write batch timeout in milliseconds, maximum 2048
# writeBatchTimeout 10
\ No newline at end of file
......@@ -23,6 +23,7 @@ extern "C" {
#include "tthread.h"
// forward declaration.
typedef struct STscObj STscObj;
typedef struct SSqlObj SSqlObj;
typedef struct SDispatcherTimeoutManager SDispatcherTimeoutManager;
......@@ -33,6 +34,9 @@ typedef struct SDispatcherTimeoutManager SDispatcherTimeoutManager;
* communications to the server and directly improving the throughput of small object asynchronous writes.
*/
typedef struct SAsyncBatchWriteDispatcher {
// the client object.
STscObj* pClient;
// the timeout manager.
SDispatcherTimeoutManager* timeoutManager;
......@@ -75,7 +79,10 @@ typedef struct SDispatcherManager {
bool isThreadLocal;
// the global dispatcher, if thread local enabled, global will be set to NULL.
SAsyncBatchWriteDispatcher* global;
SAsyncBatchWriteDispatcher* pGlobal;
// the client object.
STscObj* pClient;
} SDispatcherManager;
......@@ -146,12 +153,13 @@ void dispatcherExecute(SSqlObj** polls, size_t nPolls);
/**
* Create the async batch write dispatcher.
*
* @param pClient the client object.
* @param batchSize When user submit an insert sql to `taos_query_a`, the SSqlObj* will be buffered instead of executing
* it. If the number of the buffered rows reach `batchSize`, all the SSqlObj* will be merged and sent to vnodes.
* @param timeout The SSqlObj* will be sent to vnodes no more than `timeout` milliseconds. But the actual time
* vnodes received the SSqlObj* depends on the network quality.
*/
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(int32_t batchSize, int32_t timeoutMs);
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, int32_t batchSize, int32_t timeoutMs);
/**
* Destroy the async auto batch dispatcher.
......@@ -182,13 +190,14 @@ bool dispatcherTryDispatch(SAsyncBatchWriteDispatcher* dispatcher, SSqlObj* pSql
/**
* Create the manager of SAsyncBatchWriteDispatcher.
*
*
* @param pClient the client object.
* @param batchSize the batchSize of SAsyncBatchWriteDispatcher.
* @param timeoutMs the timeoutMs of SAsyncBatchWriteDispatcher.
* @param isThreadLocal specifies whether the dispatcher is thread local.
* @return the SAsyncBatchWriteDispatcher manager.
*/
SDispatcherManager* createDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal);
SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs, bool isThreadLocal);
/**
* Destroy the SDispatcherManager.
......
......@@ -45,6 +45,7 @@ typedef enum {
// forward declaration
struct SSqlInfo;
typedef struct SDispatcherManager SDispatcherManager;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef void (*_freeSqlSupporter)(void **);
......@@ -352,7 +353,8 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet;
pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj
SDispatcherManager*dispatcherManager;
SReqOrigin from;
} STscObj;
......@@ -510,20 +512,6 @@ int32_t taos_unused_session(TAOS* taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
/**
* Init the manager of async batch write dispatcher.
*
* @param batchSize the batchSize of async batch write dispatcher.
* @param timeoutMs the timeout of batching in milliseconds.
* @param isThreadLocal specifies whether the dispatcher is thread local.
*/
void tscInitDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal);
/**
* Destroy the manager of async batch write dispatcher.
*/
void tscDestroyDispatcherManager();
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
void tscImportDataFromFile(SSqlObj *pSql);
......@@ -552,9 +540,6 @@ extern SHashObj *tscVgroupMap;
extern SHashObj *tscTableMetaMap;
extern SCacheObj *tscVgroupListBuf;
// forward declaration.
typedef struct SDispatcherManager SDispatcherManager;
extern SDispatcherManager *tscDispatcherManager;
extern int tscObjRef;
extern void *tscTmr;
extern void *tscQhandle;
......
......@@ -395,8 +395,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
if (tscDispatcherManager != NULL) {
SAsyncBatchWriteDispatcher * dispatcher = dispatcherAcquire(tscDispatcherManager);
if (pObj->dispatcherManager != NULL) {
SAsyncBatchWriteDispatcher * dispatcher = dispatcherAcquire(pObj->dispatcherManager);
if (dispatcherTryDispatch(dispatcher, pSql)) {
taosReleaseRef(tscObjRef, pSql->self);
tscDebug("sql obj %p has been buffer in insert buffer", pSql);
......
......@@ -116,6 +116,7 @@ int32_t dispatcherBatchBuilder(SSqlObj** polls, size_t nPolls, SSqlObj** batch)
}
tscDebug("create batch call back context: %p", context);
// initialize the callback context.
context->nHandlers = nPolls;
......@@ -328,12 +329,15 @@ static void* timeoutManagerCallback(void* arg) {
return NULL;
}
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(int32_t batchSize, int32_t timeoutMs) {
SAsyncBatchWriteDispatcher* createSAsyncBatchWriteDispatcher(STscObj* pClient, int32_t batchSize, int32_t timeoutMs) {
SAsyncBatchWriteDispatcher* dispatcher = calloc(1, sizeof(SAsyncBatchWriteDispatcher) + batchSize * sizeof(SSqlObj*));
if (!dispatcher) {
return NULL;
}
assert(pClient != NULL);
dispatcher->pClient = pClient;
dispatcher->currentSize = 0;
dispatcher->bufferSize = 0;
dispatcher->batchSize = batchSize;
......@@ -459,12 +463,15 @@ static void destroyDispatcher(void* arg) {
destroySAsyncBatchWriteDispatcher(dispatcher);
}
SDispatcherManager* createDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) {
SDispatcherManager* createDispatcherManager(STscObj* pClient, int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) {
SDispatcherManager* dispatcher = calloc(1, sizeof(SDispatcherManager));
if (!dispatcher) {
return NULL;
}
assert(pClient != NULL);
dispatcher->pClient = pClient;
dispatcher->batchSize = batchSize;
dispatcher->timeoutMs = timeoutMs;
dispatcher->isThreadLocal = isThreadLocal;
......@@ -475,8 +482,8 @@ SDispatcherManager* createDispatcherManager(int32_t batchSize, int32_t timeoutMs
return NULL;
}
} else {
dispatcher->global = createSAsyncBatchWriteDispatcher(batchSize, timeoutMs);
if (!dispatcher->global) {
dispatcher->pGlobal = createSAsyncBatchWriteDispatcher(pClient, batchSize, timeoutMs);
if (!dispatcher->pGlobal) {
free(dispatcher);
return NULL;
}
......@@ -486,7 +493,7 @@ SDispatcherManager* createDispatcherManager(int32_t batchSize, int32_t timeoutMs
SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager) {
if (!manager->isThreadLocal) {
return manager->global;
return manager->pGlobal;
}
SAsyncBatchWriteDispatcher* value = pthread_getspecific(manager->key);
......@@ -494,7 +501,7 @@ SAsyncBatchWriteDispatcher* dispatcherAcquire(SDispatcherManager* manager) {
return value;
}
value = createSAsyncBatchWriteDispatcher(manager->batchSize, manager->timeoutMs);
value = createSAsyncBatchWriteDispatcher(manager->pClient, manager->batchSize, manager->timeoutMs);
if (value) {
pthread_setspecific(manager->key, value);
return value;
......@@ -507,8 +514,10 @@ void destroyDispatcherManager(SDispatcherManager* manager) {
if (manager) {
if (manager->isThreadLocal) {
pthread_key_delete(manager->key);
} else {
destroySAsyncBatchWriteDispatcher(manager->global);
}
if (manager->pGlobal) {
destroySAsyncBatchWriteDispatcher(manager->pGlobal);
}
free(manager);
}
......
......@@ -21,6 +21,7 @@
#include "tnote.h"
#include "trpc.h"
#include "tscLog.h"
#include "tscBatchWrite.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsclient.h"
......@@ -163,15 +164,30 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pSql->fp = fp;
pSql->param = param;
pSql->cmd.command = TSDB_SQL_CONNECT;
pObj->dispatcherManager = NULL;
if (tsWriteBatchSize > 1 && !tscEmbedded) {
pObj->dispatcherManager = createDispatcherManager(pObj, tsWriteBatchSize, tsWriteBatchTimeout, tsWriteBatchThreadLocal);
if (!pObj->dispatcherManager) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReleaseRpc(pRpcObj);
free(pSql);
free(pObj);
return NULL;
}
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
if (pObj->dispatcherManager) {
destroyDispatcherManager(pObj->dispatcherManager);
}
tscReleaseRpc(pRpcObj);
free(pSql);
free(pObj);
return NULL;
}
if (taos != NULL) {
*taos = pObj;
}
......
......@@ -50,7 +50,6 @@ void *tscRpcCache; // cache to keep rpc obj
int32_t tscNumOfThreads = 1; // num of rpc threads
char tscLogFileName[] = "taoslog";
int tscLogFileNum = 10;
SDispatcherManager *tscDispatcherManager = NULL;
static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
......@@ -59,19 +58,6 @@ static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER;
// pthread_once can not return result code, so result code is set to a global variable.
static volatile int tscInitRes = 0;
void tscInitDispatcherManager(int32_t batchSize, int32_t timeoutMs, bool isThreadLocal) {
if (tsAsyncBatchEnable) {
tscDispatcherManager = createDispatcherManager(batchSize, timeoutMs, isThreadLocal);
}
}
void tscDestroyDispatcherManager() {
if (tscDispatcherManager) {
destroyDispatcherManager(tscDispatcherManager);
tscDispatcherManager = NULL;
}
}
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) {
taosGetDisk();
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
......@@ -230,8 +216,6 @@ void taos_init_imp(void) {
#endif
tscDebug("starting to initialize client ...");
tscDebug("Local End Point is:%s", tsLocalEp);
tscInitDispatcherManager(tsAsyncBatchSize, tsAsyncBatchTimeout, tsAsyncBatchThreadLocal);
}
taosSetCoreDump();
......@@ -296,9 +280,6 @@ void taos_cleanup(void) {
#ifdef LUA_EMBEDDED
scriptEnvPoolCleanup();
#endif
if (tsAsyncBatchEnable) {
tscDestroyDispatcherManager();
}
}
int32_t id = tscObjRef;
......@@ -339,6 +320,49 @@ void taos_cleanup(void) {
taosTmrCleanUp(p);
}
/**
* Set the option value (int32, uint16, int16, int8).
* @param cfg the config.
* @param str the value string.
* @return whether is set or not.
*/
static bool taos_set_option_int(SGlobalCfg * cfg, const char* str) {
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
char* p = NULL;
errno = 0;
long value = strtol(str, &p, 10);
if (errno != 0 || p == str) {
tscError("failed to parse option: %s, value: %s", cfg->option, str);
return false;
}
if ((float) value < cfg->minValue || (float) value > cfg->maxValue) {
tscError("failed to set option: %s, setValue: %ld, minValue: %f, maxValue: %f", cfg->option, value, cfg->minValue, cfg->maxValue);
return false;
}
if (cfg->valType == TAOS_CFG_VTYPE_INT32) {
*((int32_t*) cfg->ptr) = (int32_t) value;
} else if (cfg->valType == TAOS_CFG_VTYPE_UINT16) {
*((uint16_t*) cfg->ptr) = (uint16_t) value;
} else if (cfg->valType == TAOS_CFG_VTYPE_INT16) {
*((int16_t*) cfg->ptr) = (int16_t) value;
} else if (cfg->valType == TAOS_CFG_VTYPE_INT8) {
*((int8_t*) cfg->ptr) = (int8_t) value;
} else {
tscError("failed to set option: %s, type expected %d", cfg->option, cfg->valType);
return false;
}
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscDebug("config option: %s has set to %s", cfg->option, str);
return true;
}
tscWarn("config option: %s, is configured by %s", cfg->option, tsCfgStatusStr[cfg->cfgStatus]);
return false;
}
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
SGlobalCfg *cfg = NULL;
......@@ -493,6 +517,27 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
case TSDB_WRITE_BATCH_SIZE: {
cfg = taosGetConfigOption("writeBatchSize");
assert(cfg != NULL);
taos_set_option_int(cfg, pStr);
break;
}
case TSDB_WRITE_BATCH_TIMEOUT: {
cfg = taosGetConfigOption("writeBatchTimeout");
assert(cfg != NULL);
taos_set_option_int(cfg, pStr);
break;
}
case TSDB_WRITE_BATCH_THREAD_LOCAL: {
cfg = taosGetConfigOption("writeBatchThreadLocal");
assert(cfg != NULL);
taos_set_option_int(cfg, pStr);
break;
}
default:
// TODO return the correct error code to client in the format for taos_errstr()
......
......@@ -20,6 +20,7 @@
#include "texpr.h"
#include "tkey.h"
#include "tmd5.h"
#include "tscBatchWrite.h"
#include "tscGlobalmerge.h"
#include "tscLog.h"
#include "tscProfile.h"
......@@ -2350,6 +2351,9 @@ void tscCloseTscObj(void *param) {
tscReleaseRpc(pObj->pRpcObj);
pthread_mutex_destroy(&pObj->mutex);
tscReleaseClusterInfo(pObj->clusterId);
destroyDispatcherManager(pObj->dispatcherManager);
pObj->dispatcherManager = NULL;
tfree(pObj);
}
......
......@@ -92,10 +92,9 @@ extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow;
extern bool tsAsyncBatchEnable;
extern bool tsAsyncBatchThreadLocal;
extern int32_t tsAsyncBatchSize;
extern int32_t tsAsyncBatchTimeout;
extern bool tsWriteBatchThreadLocal;
extern int32_t tsWriteBatchSize;
extern int32_t tsWriteBatchTimeout;
// db parameters in client
extern int32_t tsCacheBlockSize;
......
......@@ -127,11 +127,10 @@ int8_t tsSortWhenGroupBy = 1;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// The taosc async insertion batching feature.
bool tsAsyncBatchEnable = true;
bool tsAsyncBatchThreadLocal = true; // if thread local enable, each thread will allocate a dispatcher.
int32_t tsAsyncBatchSize = 96; // suggest: 64 - 512
int32_t tsAsyncBatchTimeout = 10; // suggest: 5 - 200 (unit: milliseconds)
// The tsc async write batching feature (using ABWD).
bool tsWriteBatchThreadLocal = true; // if thread local enable, each thread will allocate a dispatcher.
int32_t tsWriteBatchSize = 96; // suggest: 64 - 512, 0 means disable batching.
int32_t tsWriteBatchTimeout = 10; // suggest: 5 - 200 (unit: milliseconds)
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
......@@ -1855,41 +1854,31 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "asyncBatchEnable";
cfg.ptr = &tsAsyncBatchEnable;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "asyncBatchThreadLocal";
cfg.ptr = &tsAsyncBatchThreadLocal;
cfg.option = "writeBatchThreadLocal";
cfg.ptr = &tsWriteBatchThreadLocal;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "asyncBatchSize";
cfg.ptr = &tsAsyncBatchSize;
cfg.option = "writeBatchSize";
cfg.ptr = &tsWriteBatchSize;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 1;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 4096;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "asyncBatchTimeout";
cfg.ptr = &tsAsyncBatchTimeout;
cfg.option = "writeBatchTimeout";
cfg.ptr = &tsWriteBatchTimeout;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 1;
cfg.maxValue = 2048;
cfg.ptrLength = 0;
......
......@@ -60,7 +60,10 @@ typedef enum {
TSDB_OPTION_TIMEZONE,
TSDB_OPTION_CONFIGDIR,
TSDB_OPTION_SHELL_ACTIVITY_TIMER,
TSDB_MAX_OPTIONS
TSDB_MAX_OPTIONS,
TSDB_WRITE_BATCH_SIZE,
TSDB_WRITE_BATCH_TIMEOUT,
TSDB_WRITE_BATCH_THREAD_LOCAL
} TSDB_OPTION;
typedef struct taosField {
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#define TSDB_CFG_MAX_NUM 140
#define TSDB_CFG_MAX_NUM 139
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册