diff --git a/documentation20/webdocs/markdowndocs/architecture-ch.md b/documentation20/webdocs/markdowndocs/architecture-ch.md index d4705ccb05c092d8da38072368a167466bd78968..c9bfa30830fe7b2f3cd1364b589326255560ad83 100644 --- a/documentation20/webdocs/markdowndocs/architecture-ch.md +++ b/documentation20/webdocs/markdowndocs/architecture-ch.md @@ -4,16 +4,99 @@ ### 物联网典型场景 在典型的物联网、车联网、运维监测场景中,往往有多种不同类型的数据采集设备,采集一个到多个不同的物理量。而同一种采集设备类型,往往又有多个具体的采集设备分布在不同的地点。大数据处理系统就是要将各种采集的数据汇总,然后进行计算和分析。对于同一类设备,其采集的数据都是很规则的。以智能电表为例,假设每个智能电表采集电流、电压、相位三个量,其采集的数据类似如下的表格: -| Device ID | Time Stamp | current | voltage | phase | location | groupId | -| :-------: | :-----------: | :-----: | :-----: | :---: | :--------------: | :-----: | -| d1001 | 1538548685000 | 10.3 | 219 | 0.31 | Beijing.Chaoyang | 2 | -| d1002 | 1538548684000 | 10.2 | 220 | 0.23 | Beijing.Chaoyang | 3 | -| d1003 | 1538548686500 | 11.5 | 221 | 0.35 | Beijing.Haidian | 3 | -| d1004 | 1538548685500 | 13.4 | 223 | 0.29 | Beijing.Haidian | 2 | -| d1001 | 1538548695000 | 12.6 | 218 | 0.33 | Beijing.Chaoyang | 2 | -| d1004 | 1538548696600 | 11.8 | 221 | 0.28 | Beijing.Haidian | 2 | -| d1002 | 1538548696650 | 10.3 | 218 | 0.25 | Beijing.Chaoyang | 3 | -| d1001 | 1538548696800 | 12.3 | 221 | 0.31 | Beijing.Chaoyang | 2 | +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
设备ID时间戳采集量标签
Device IDTime StampcurrentvoltagephaselocationgroupId
d1001153854868500010.32190.31Beijing.Chaoyang2
d1002153854868400010.22200.23Beijing.Chaoyang3
d1003153854868650011.52210.35Beijing.Haidian3
d1004153854868550013.42230.29Beijing.Haidian2
d1001153854869500012.62180.33Beijing.Chaoyang2
d1004153854869660011.82210.28Beijing.Haidian2
d1002153854869665010.32180.25Beijing.Chaoyang3
d1001153854869680012.32210.31Beijing.Chaoyang2
表1:智能电表数据示例
@@ -221,7 +304,7 @@ TDengine采用时间驱动缓存管理策略(First-In-First-Out,FIFO), TDengine通过查询函数向用户提供毫秒级的数据获取能力。直接将最近到达的数据保存在缓存中,可以更加快速地响应用户针对最近一条或一批数据的查询分析,整体上提供更快的数据库查询响应能力。从这个意义上来说,**可通过设置合适的配置参数将TDengine作为数据缓存来使用,而不需要再部署Redis或其他额外的缓存系统**,可有效地简化系统架构,降低运维的成本。需要注意的是,TDengine重启以后系统的缓存将被清空,之前缓存的数据均会被批量写入磁盘,缓存的数据将不会像专门的Key-value缓存系统再将之前缓存的数据重新加载到缓存中。 -每个vnode有自己独立的内存,而且由多个固定大小的内存块组成,不同vnode之间完全隔离。数据写入时,类似于日志的写法,数据被顺序追加写入内存,但每个vnode维护有自己的skip list,便于迅速查找。当一半以上的内存块写满时,启动落盘操作,而且后续写的操作在新的内存块进行。这样,一个vnode里有一半内存块是保留有最近的数据的,以达到缓存、快速查找的目的。一个vnode的内存块的个数由配置参数blocks决定,内存块的大小由配置参数cache决定。 +每个vnode有自己独立的内存,而且由多个固定大小的内存块组成,不同vnode之间完全隔离。数据写入时,类似于日志的写法,数据被顺序追加写入内存,但每个vnode维护有自己的skip list,便于迅速查找。当三分之一以上的内存块写满时,启动落盘操作,而且后续写的操作在新的内存块进行。这样,一个vnode里有三分之一内存块是保留有最近的数据的,以达到缓存、快速查找的目的。一个vnode的内存块的个数由配置参数blocks决定,内存块的大小由配置参数cache决定。 ### 持久化存储 TDengine采用数据驱动的方式让缓存中的数据写入硬盘进行持久化存储。当vnode中缓存的数据达到一定规模时,为了不阻塞后续数据的写入,TDengine也会拉起落盘线程将缓存的数据写入持久化存储。TDengine在数据落盘时会打开新的数据库日志文件,在落盘成功后则会删除老的数据库日志文件,避免日志文件无限制的增长。 diff --git a/src/balance/src/balance.c b/src/balance/src/balance.c index 4c687cb134141e38aef1cb9b17dcb45e89278e1b..df78f4fe270d34d17e436e80322f6c81b68d0d2c 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/balance.c @@ -571,8 +571,8 @@ static void balanceCheckDnodeAccess() { if (pDnode->status != TAOS_DN_STATUS_DROPPING && pDnode->status != TAOS_DN_STATUS_OFFLINE) { pDnode->status = TAOS_DN_STATUS_OFFLINE; pDnode->offlineReason = TAOS_DN_OFF_STATUS_MSG_TIMEOUT; - mInfo("dnode:%d, set to offline state, access seq:%d, last seq:%d", pDnode->dnodeId, tsAccessSquence, - pDnode->lastAccess); + mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence, + pDnode->lastAccess, pDnode->status); balanceSetVgroupOffline(pDnode); } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index bb9725a74432ac2c06adc86513b62ee32b4b8125..c452b51050b856eea09ae4813b9671c68659d540 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1238,8 +1238,7 @@ void tscColumnListDestroy(SArray* pColumnList) { * */ static int32_t validateQuoteToken(SStrToken* pToken) { - strdequote(pToken->z); - pToken->n = (uint32_t)strtrim(pToken->z); + tscDequoteAndTrimToken(pToken); int32_t k = tSQLGetToken(pToken->z, &pToken->type); @@ -1254,8 +1253,6 @@ static int32_t validateQuoteToken(SStrToken* pToken) { } void tscDequoteAndTrimToken(SStrToken* pToken) { - assert(pToken->type == TK_STRING); - uint32_t first = 0, last = pToken->n; // trim leading spaces @@ -1367,7 +1364,8 @@ int32_t tscValidateName(SStrToken* pToken) { } else { pStr[firstPartLen] = TS_PATH_DELIMITER[0]; memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n); - pStr[firstPartLen + sizeof(TS_PATH_DELIMITER[0]) + pToken->n] = 0; + uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1)); + memset(pToken->z + pToken->n - offset, ' ', offset); } pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0])); pToken->z = pStr; diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index dbcf50ba77e09eb06d1baf3d48dc4ed2568c1fcf..4087f638a95b063fcf6081db2556758643ef1c9a 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -44,6 +44,7 @@ extern int32_t tsMaxShellConns; extern int32_t tsShellActivityTimer; extern uint32_t tsMaxTmrCtrl; extern float tsNumOfThreadsPerCore; +extern int32_t tsNumOfCommitThreads; extern float tsRatioOfQueryThreads; // todo remove it extern int8_t tsDaylight; extern char tsTimezone[]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 3bc1e4d0cc1c4de3365a1324001704f53584ab3b..4495c3d9288a5df0b8944aa444625c143b5c7670 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -51,6 +51,7 @@ int32_t tsMaxShellConns = 5000; int32_t tsMaxConnections = 5000; int32_t tsShellActivityTimer = 3; // second float tsNumOfThreadsPerCore = 1.0f; +int32_t tsNumOfCommitThreads = 1; float tsRatioOfQueryThreads = 0.5f; int8_t tsDaylight = 0; char tsTimezone[TSDB_TIMEZONE_LEN] = {0}; @@ -426,6 +427,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "numOfCommitThreads"; + cfg.ptr = &tsNumOfCommitThreads; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 1; + cfg.maxValue = 100; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "ratioOfQueryThreads"; cfg.ptr = &tsRatioOfQueryThreads; cfg.valType = TAOS_CFG_VTYPE_FLOAT; diff --git a/src/dnode/src/dnodeMInfos.c b/src/dnode/src/dnodeMInfos.c index c985db371d96be5d78d71a50f9602485157a84a4..cefe44aebe7f87803141ce3d75c45dca18463849 100644 --- a/src/dnode/src/dnodeMInfos.c +++ b/src/dnode/src/dnodeMInfos.c @@ -77,15 +77,15 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) { void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) { if (ep->numOfEps <= 0) { - dError("mnode EP list for peer is changed, but content is invalid, discard it"); + dError("minfos is changed, but content is invalid, discard it"); return; } pthread_mutex_lock(&tsMInfosMutex); - dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); + dInfo("minfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); for (int i = 0; i < ep->numOfEps; ++i) { ep->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]); + dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]); } tsMEpSet = *ep; pthread_mutex_unlock(&tsMInfosMutex); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index fa8af2c67e5dff0cbc0a66a4fca3bfa9f84cbe26..875092b88da1aa03b7eb017e14fff3148f3644a8 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -431,6 +431,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_PORT_HTTP 11 #define TSDB_PORT_ARBITRATOR 12 +#define TSDB_MAX_WAL_SIZE (1024*1024) + typedef enum { TAOS_QTYPE_RPC = 0, TAOS_QTYPE_FWD = 1, diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 01891234efeae1972a3884c57b99d204a335e7d5..1919747a0b019239e5d6b717921c6a957728d455 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -237,7 +237,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, 0, 0x0709, "Multiple retrieval of this query") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, 0, 0x070A, "Too many time window in query") -TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired") @@ -261,6 +261,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sy // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, 0, 0x1002, "WAL size exceeds limit") // http TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin") diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 02ca99e4b85f5dfedb8b868803f2135c36eb01df..d7515a14956d7029d909241b2985023d33e58622 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -321,6 +321,10 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); */ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage); +int tsdbInitCommitQueue(int nthreads); +void tsdbDestroyCommitQueue(); +int tsdbSyncCommit(TSDB_REPO_T *repo); + #ifdef __cplusplus } #endif diff --git a/src/inc/twal.h b/src/inc/twal.h index c32bb870213fbcb183af599b0ac94b37f54e8906..8dd3a8a91209e840abeb9560f94a52ce362492a9 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -54,15 +54,17 @@ typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg) int32_t walInit(); void walCleanUp(); -twalh walOpen(char *path, SWalCfg *pCfg); -int32_t walAlter(twalh pWal, SWalCfg *pCfg); -void walStop(twalh); -void walClose(twalh); -int32_t walRenew(twalh); -int32_t walWrite(twalh, SWalHead *); -void walFsync(twalh, bool forceFsync); -int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); -int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); +twalh walOpen(char *path, SWalCfg *pCfg); +int32_t walAlter(twalh pWal, SWalCfg *pCfg); +void walStop(twalh); +void walClose(twalh); +int32_t walRenew(twalh); +void walRemoveOneOldFile(twalh); +void walRemoveAllOldFiles(twalh); +int32_t walWrite(twalh, SWalHead *); +void walFsync(twalh, bool forceFsync); +int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); +int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); uint64_t walGetVersion(twalh); #ifdef __cplusplus diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 6178d91044d9613ba7c8ab5bb32f55754d231360..53e7d2398450fe22a11da1e7254e0c2ab5f02ea4 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -475,6 +475,7 @@ typedef struct { tsem_t mutex_sem; int notFinished; tsem_t lock_sem; + int counter; } info; typedef struct { @@ -766,6 +767,7 @@ int main(int argc, char *argv[]) { t_info->data_of_rate = rate; t_info->end_table_id = i < b ? last + a : last + a - 1; last = t_info->end_table_id + 1; + t_info->counter = 0; tsem_init(&(t_info->mutex_sem), 0, 1); t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1; @@ -788,14 +790,14 @@ int main(int argc, char *argv[]) { printf("ASYNC Insert with %d connections:\n", threads); } - fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n", - ntables * nrecords_per_table, ntables * nrecords_per_table / t, - (ntables * nrecords_per_table) / (t * nrecords_per_request), + fprintf(fp, "|%"PRIu64" | %10.2f | %10.2f | %10.4f |\n\n", + (int64_t)ntables * nrecords_per_table, ntables * nrecords_per_table / t, + ((int64_t)ntables * nrecords_per_table) / (t * nrecords_per_request), t * 1000); - printf("Spent %.4f seconds to insert %lld records with %d record(s) per request: %.2f records/second\n", - t, (long long int)ntables * nrecords_per_table, nrecords_per_request, - ((long long int)ntables * nrecords_per_table) / t); + printf("Spent %.4f seconds to insert %"PRIu64" records with %d record(s) per request: %.2f records/second\n", + t, (int64_t)ntables * nrecords_per_table, nrecords_per_request, + (int64_t)ntables * nrecords_per_table / t); for (int i = 0; i < threads; i++) { info *t_info = infos + i; @@ -1284,71 +1286,39 @@ void *syncWrite(void *sarg) { void *asyncWrite(void *sarg) { info *winfo = (info *)sarg; - - sTable *tb_infos = (sTable *)malloc(sizeof(sTable) * (winfo->end_table_id - winfo->start_table_id + 1)); - - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - sTable *tb_info = tb_infos + tID - winfo->start_table_id; - tb_info->data_type = winfo->datatype; - tb_info->ncols_per_record = winfo->ncols_per_record; - tb_info->taos = winfo->taos; - sprintf(tb_info->tb_name, "%s.%s%d", winfo->db_name, winfo->tb_prefix, tID); - tb_info->timestamp = winfo->start_time; - tb_info->counter = 0; - tb_info->target = winfo->nrecords_per_table; - tb_info->len_of_binary = winfo->len_of_binary; - tb_info->nrecords_per_request = winfo->nrecords_per_request; - tb_info->mutex_sem = &(winfo->mutex_sem); - tb_info->notFinished = &(winfo->notFinished); - tb_info->lock_sem = &(winfo->lock_sem); - tb_info->data_of_order = winfo->data_of_order; - tb_info->data_of_rate = winfo->data_of_rate; - - /* char buff[BUFFER_SIZE] = "\0"; */ - /* sprintf(buff, "insert into %s values (0, 0)", tb_info->tb_name); */ - /* queryDB(tb_info->taos,buff); */ - - taos_query_a(winfo->taos, "show databases", callBack, tb_info); - } + taos_query_a(winfo->taos, "show databases", callBack, winfo); tsem_wait(&(winfo->lock_sem)); - free(tb_infos); return NULL; } void callBack(void *param, TAOS_RES *res, int code) { - sTable *tb_info = (sTable *)param; - char **datatype = tb_info->data_type; - int ncols_per_record = tb_info->ncols_per_record; - int len_of_binary = tb_info->len_of_binary; - int64_t tmp_time = tb_info->timestamp; - - if (code < 0) { - fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res)); - taos_free_result(res); - taos_cleanup(); - exit(EXIT_FAILURE); - } + info* winfo = (info*)param; + char **datatype = winfo->datatype; + int ncols_per_record = winfo->ncols_per_record; + int len_of_binary = winfo->len_of_binary; - // If finished; - if (tb_info->counter >= tb_info->target) { - tsem_wait(tb_info->mutex_sem); - (*(tb_info->notFinished))--; - if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem); - tsem_post(tb_info->mutex_sem); + int64_t tmp_time = winfo->start_time; + char *buffer = calloc(1, BUFFER_SIZE); + char *data = calloc(1, MAX_DATA_SIZE); + char *pstr = buffer; + pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); + if (winfo->counter >= winfo->nrecords_per_table) { + winfo->start_table_id++; + winfo->counter = 0; + } + if (winfo->start_table_id > winfo->end_table_id) { + tsem_post(&winfo->lock_sem); + free(buffer); + free(data); taos_free_result(res); return; } - - char buffer[BUFFER_SIZE] = "\0"; - char data[MAX_DATA_SIZE]; - char *pstr = buffer; - pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name); - - for (int i = 0; i < tb_info->nrecords_per_request; i++) { + + for (int i = 0; i < winfo->nrecords_per_request; i++) { int rand_num = rand() % 100; - if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate) + if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) { int64_t d = tmp_time - rand() % 1000000 + rand_num; generateData(data, datatype, ncols_per_record, d, len_of_binary); @@ -1357,16 +1327,15 @@ void callBack(void *param, TAOS_RES *res, int code) { generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary); } pstr += sprintf(pstr, "%s", data); - tb_info->counter++; + winfo->counter++; - if (tb_info->counter >= tb_info->target) { + if (winfo->counter >= winfo->nrecords_per_table) { break; } } - - tb_info->timestamp = tmp_time; - - taos_query_a(tb_info->taos, buffer, callBack, tb_info); + taos_query_a(winfo->taos, buffer, callBack, winfo); + free(buffer); + free(data); taos_free_result(res); } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 37e00fc4e3201ee6198c6f160bb445cdcb8759f4..7e34e0837389e8a288ba70a2b18d4627f91219ae 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -584,7 +584,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT; } - mDebug("dnode:%d, from offline to online", pDnode->dnodeId); + mInfo("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; pDnode->offlineReason = TAOS_DN_OFF_ONLINE; balanceSyncNotify(); diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 2a04f541c51483ccba93369a65061507f83ead23..e43f8c1b786e6126253cdb40c0184bc34dc4dab2 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -63,9 +63,9 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { epSet->inUse = (i + 1) % epSet->numOfEps; - mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + mDebug("mpeer:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); } else { - mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + mDebug("mpeer:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); } } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 17e39f031e329a786db2bca7a27322887d7c8d43..98a4cf2c423dd7b997a3fb59718b93dfae9a3ebf 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7233,7 +7233,7 @@ void qCleanupQueryMgmt(void* pQMgmt) { pthread_mutex_destroy(&pQueryMgmt->lock); tfree(pQueryMgmt); - qDebug("vgId:%d queryMgmt cleanup completed", vgId); + qDebug("vgId:%d, queryMgmt cleanup completed", vgId); } void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 240b401bdaef252e36109490658b39ede749a8fe..93c6efc20a8d79a7d6436680f01cee87dadda853 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -35,6 +35,8 @@ extern "C" { #define TAOS_SMSG_SYNC_MUST 6 #define TAOS_SMSG_STATUS 7 +#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) + #define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 9dcd0fd632396baf5f8bf68c3df24dae72d45a02..5c92801cc389df6745377d81b4110ccf8fbc8441 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -79,7 +79,7 @@ int32_t syncInit() { info.numOfThreads = tsSyncTcpThreads; info.serverIp = 0; info.port = tsSyncPort; - info.bufferSize = 640000; + info.bufferSize = SYNC_MAX_SIZE; info.processBrokenLink = syncProcessBrokenLink; info.processIncomingMsg = syncProcessPeerMsg; info.processIncomingConn = syncProcessIncommingConnection; @@ -486,7 +486,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { pPeer->ip = ip; pPeer->port = pInfo->nodePort; pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0; - snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port); + snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port); pPeer->peerFd = -1; pPeer->syncFd = -1; @@ -850,7 +850,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SWalHead * pHead = (SWalHead *)cont; - sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); + sDebug("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { // nodeVersion = pHead->version; @@ -859,7 +859,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { syncSaveIntoBuffer(pPeer, pHead); } else { - sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version); + sError("%s, forward discarded, hver:%" PRIu64, pPeer->id, pHead->version); } } } @@ -890,10 +890,11 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { // head.len = htonl(head.len); if (pHead->len < 0) { - sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); + sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len); return -1; } + assert(pHead->len <= TSDB_MAX_WAL_SIZE); int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); if (bytes != pHead->len) { sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 82a4627ea55a9f0caf041b9ee41b4b3e50c229e4..968f5becaddc7b06c06171a4d91cc0e0ffffc0da 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -244,7 +244,7 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { } static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { - SWalHead *pHead = malloc(640000); + SWalHead *pHead = malloc(SYNC_MAX_SIZE); int32_t code = -1; int32_t bytes = 0; int32_t sfd; diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index 496bf074357487ab1c87072948b2e26be056dc58..b7f819a3cd8f890d3580352322d8313f7cb2828e 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -86,7 +86,7 @@ int32_t main(int32_t argc, char *argv[]) { info.numOfThreads = 1; info.serverIp = 0; info.port = tsArbitratorPort; - info.bufferSize = 640000; + info.bufferSize = SYNC_MAX_SIZE; info.processBrokenLink = arbProcessBrokenLink; info.processIncomingMsg = arbProcessPeerMsg; info.processIncomingConn = arbProcessIncommingConnection; @@ -128,7 +128,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { } firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0; - snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); + snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); if (firstPkt.syncHead.vgId) { sDebug("%s, vgId in head is not zero, close the connection", pNode->id); tfree(pNode); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 7d40d7f00a02af2a38e4fe3ac4bcb52e31d42477..0962e7c2cbb7fa0576fe1f00495ebbcbd23959d3 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -220,8 +220,7 @@ typedef struct { SMemTable* mem; SMemTable* imem; STsdbFileH* tsdbFileH; - int commit; - pthread_t commitThread; + sem_t readyToCommit; pthread_mutex_t mutex; bool repoLocked; } STsdbRepo; @@ -440,6 +439,7 @@ void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); +void* tsdbCommitData(STsdbRepo* pRepo); static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { if (pIter == NULL) return NULL; @@ -588,6 +588,9 @@ int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx); int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); +// ------------------ tsdbCommitQueue.c +int tsdbScheduleCommit(STsdbRepo *pRepo); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index dcc9d4ca1be724faba27a7479c340d5881f38432..7cea27658c80d689972e3cb0f5dda3269a34b720 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -110,7 +110,7 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) { } } - tsdbDebug("vgId:%d buffer pool is closed", REPO_ID(pRepo)); + tsdbDebug("vgId:%d, buffer pool is closed", REPO_ID(pRepo)); } SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { @@ -134,7 +134,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { pBufBlock->offset = 0; pBufBlock->remain = pBufPool->bufBlockSize; - tsdbDebug("vgId:%d buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId); + tsdbDebug("vgId:%d, buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId); return pNode; } diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c new file mode 100644 index 0000000000000000000000000000000000000000..3c158a2201c1641d57b8c0902b2b3a9c1c828c9e --- /dev/null +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tlist.h" +#include "tsdbMain.h" + +typedef struct { + bool stop; + pthread_mutex_t lock; + pthread_cond_t queueNotEmpty; + int nthreads; + SList * queue; + pthread_t * threads; +} SCommitQueue; + +typedef struct { + STsdbRepo *pRepo; +} SCommitReq; + +static void *tsdbLoopCommit(void *arg); + +SCommitQueue tsCommitQueue = {0}; + +int tsdbInitCommitQueue(int nthreads) { + SCommitQueue *pQueue = &tsCommitQueue; + + if (nthreads < 1) nthreads = 1; + + pQueue->stop = false; + pQueue->nthreads = nthreads; + + pQueue->queue = tdListNew(0); + if (pQueue->queue == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t)); + if (pQueue->threads == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tdListFree(pQueue->queue); + return -1; + } + + pthread_mutex_init(&(pQueue->lock), NULL); + pthread_cond_init(&(pQueue->queueNotEmpty), NULL); + + for (int i = 0; i < nthreads; i++) { + pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL); + } + + return 0; +} + +void tsdbDestroyCommitQueue() { + SCommitQueue *pQueue = &tsCommitQueue; + + pthread_mutex_lock(&(pQueue->lock)); + + if (pQueue->stop) { + pthread_mutex_unlock(&(pQueue->lock)); + return; + } + + pQueue->stop = true; + pthread_cond_broadcast(&(pQueue->queueNotEmpty)); + + pthread_mutex_unlock(&(pQueue->lock)); + + for (size_t i = 0; i < pQueue->nthreads; i++) { + pthread_join(pQueue->threads[i], NULL); + } + + free(pQueue->threads); + tdListFree(pQueue->queue); + pthread_cond_destroy(&(pQueue->queueNotEmpty)); + pthread_mutex_destroy(&(pQueue->lock)); +} + +int tsdbScheduleCommit(STsdbRepo *pRepo) { + SCommitQueue *pQueue = &tsCommitQueue; + + SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq)); + if (pNode == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + ((SCommitReq *)pNode->data)->pRepo = pRepo; + + pthread_mutex_lock(&(pQueue->lock)); + + ASSERT(!pQueue->stop); + + tdListAppendNode(pQueue->queue, pNode); + pthread_cond_signal(&(pQueue->queueNotEmpty)); + + pthread_mutex_unlock(&(pQueue->lock)); + return 0; +} + +static void *tsdbLoopCommit(void *arg) { + SCommitQueue *pQueue = &tsCommitQueue; + SListNode * pNode = NULL; + STsdbRepo * pRepo = NULL; + + while (true) { + pthread_mutex_lock(&(pQueue->lock)); + + while (true) { + pNode = tdListPopHead(pQueue->queue); + if (pNode == NULL) { + if (pQueue->stop) { + pthread_mutex_unlock(&(pQueue->lock)); + goto _exit; + } else { + pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock)); + } + } else { + break; + } + } + + pthread_mutex_unlock(&(pQueue->lock)); + + pRepo = ((SCommitReq *)pNode->data)->pRepo; + + tsdbCommitData(pRepo); + listNodeFree(pNode); + } + +_exit: + return NULL; +} diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index e2523d114582f67244e0a56c485bf320a6b6d52f..2ded3b668b47ac3dcf35fef287bae8f056977cb2 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -163,7 +163,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { if (toCommit) { tsdbAsyncCommit(pRepo); - if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); + sem_wait(&(pRepo->readyToCommit)); } tsdbUnRefMemTable(pRepo, pRepo->mem); tsdbUnRefMemTable(pRepo, pRepo->imem); @@ -675,6 +675,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { goto _err; } + code = sem_init(&(pRepo->readyToCommit), 0, 1); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + goto _err; + } + pRepo->repoLocked = false; pRepo->rootDir = strdup(rootDir); @@ -719,6 +725,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { // tsdbFreeMemTable(pRepo->mem); // tsdbFreeMemTable(pRepo->imem); tfree(pRepo->rootDir); + sem_destroy(&(pRepo->readyToCommit)); pthread_mutex_destroy(&pRepo->mutex); free(pRepo); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 081929a87570f5263566643cf135cfb781b161a8..5680abcc6f4bf7874b59583cbfe3815c8a72a0a2 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -24,7 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); -static void * tsdbCommitData(void *arg); static int tsdbCommitMeta(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); @@ -262,40 +261,28 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { SMemTable *pIMem = pRepo->imem; - int code = 0; - if (pIMem != NULL) { - ASSERT(pRepo->commit); - tsdbDebug("vgId:%d waiting for the commit thread", REPO_ID(pRepo)); - code = pthread_join(pRepo->commitThread, NULL); - tsdbDebug("vgId:%d commit thread is finished", REPO_ID(pRepo)); - if (code != 0) { - tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pRepo->commit = 0; - } - - ASSERT(pRepo->commit == 0); if (pRepo->mem != NULL) { + sem_wait(&(pRepo->readyToCommit)); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); if (tsdbLockRepo(pRepo) < 0) return -1; pRepo->imem = pRepo->mem; pRepo->mem = NULL; - pRepo->commit = 1; - code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo); - if (code != 0) { - tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(code); - tsdbUnlockRepo(pRepo); - return -1; - } + tsdbScheduleCommit(pRepo); if (tsdbUnlockRepo(pRepo) < 0) return -1; } - if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; + if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; + + return 0; +} +int tsdbSyncCommit(TSDB_REPO_T *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + tsdbAsyncCommit(pRepo); + sem_wait(&(pRepo->readyToCommit)); + sem_post(&(pRepo->readyToCommit)); return 0; } @@ -419,6 +406,68 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } +void *tsdbCommitData(STsdbRepo *pRepo) { + SMemTable * pMem = pRepo->imem; + STsdbCfg * pCfg = &pRepo->config; + SDataCols * pDataCols = NULL; + STsdbMeta * pMeta = pRepo->tsdbMeta; + SCommitIter *iters = NULL; + SRWHelper whelper = {0}; + ASSERT(pMem != NULL); + + tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), + pMem->keyFirst, pMem->keyLast, pMem->numOfRows); + + // Create the iterator to read from cache + if (pMem->numOfRows > 0) { + iters = tsdbCreateCommitIters(pRepo); + if (iters == NULL) { + tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { + tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", + REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + goto _exit; + } + + int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); + int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + + // Loop to commit to each file + for (int fid = sfid; fid <= efid; fid++) { + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _exit; + } + } + } + + // Commit to update meta file + if (tsdbCommitMeta(pRepo) < 0) { + tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + tsdbFitRetention(pRepo); + +_exit: + tdFreeDataCols(pDataCols); + tsdbDestroyCommitIters(iters, pMem->maxTables); + tsdbDestroyHelper(&whelper); + tsdbEndCommit(pRepo); + tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); + + return NULL; +} + // ---------------- LOCAL FUNCTIONS ---------------- static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { ASSERT(pRepo->mem != NULL); @@ -529,69 +578,6 @@ static void tsdbFreeTableData(STableData *pTableData) { static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); } -static void *tsdbCommitData(void *arg) { - STsdbRepo * pRepo = (STsdbRepo *)arg; - SMemTable * pMem = pRepo->imem; - STsdbCfg * pCfg = &pRepo->config; - SDataCols * pDataCols = NULL; - STsdbMeta * pMeta = pRepo->tsdbMeta; - SCommitIter *iters = NULL; - SRWHelper whelper = {0}; - ASSERT(pRepo->commit == 1); - ASSERT(pMem != NULL); - - tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), - pMem->keyFirst, pMem->keyLast, pMem->numOfRows); - - // Create the iterator to read from cache - if (pMem->numOfRows > 0) { - iters = tsdbCreateCommitIters(pRepo); - if (iters == NULL) { - tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", - REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); - goto _exit; - } - - int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); - int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); - - // Loop to commit to each file - for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { - tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _exit; - } - } - } - - // Commit to update meta file - if (tsdbCommitMeta(pRepo) < 0) { - tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - tsdbFitRetention(pRepo); - -_exit: - tdFreeDataCols(pDataCols); - tsdbDestroyCommitIters(iters, pMem->maxTables); - tsdbDestroyHelper(&whelper); - tsdbEndCommit(pRepo); - tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); - - return NULL; -} static int tsdbCommitMeta(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; @@ -642,8 +628,8 @@ _err: } static void tsdbEndCommit(STsdbRepo *pRepo) { - ASSERT(pRepo->commit == 1); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); + sem_post(&(pRepo->readyToCommit)); } static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 021c10ab6a53eac66d120d8b01574598ec6689ff..8bbdf4e3628dee0095a36fd8c044b007926d7919 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2348,7 +2348,8 @@ void filterPrepare(void* expr, void* param) { if (size < (uint32_t)pSchema->bytes) { size = pSchema->bytes; } - pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE); // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space. + // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space. + pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); tVariantDump(pCond, pInfo->q, pSchema->type, true); } } diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 45e3308bc8c87b506fa907bdbbe71d3f264707b3..5ecc8ce119a925df47832684f58af34b79e8d773 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -24,8 +24,8 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); static void tSkipListCorrectLevel(SSkipList *pSkipList); static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); -static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode); -static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData); +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode); +static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData); static SSkipListNode * tSkipListNewNode(uint8_t level); #define tSkipListFreeNode(n) tfree((n)) @@ -108,17 +108,17 @@ void tSkipListDestroy(SSkipList *pSkipList) { SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { if (pSkipList == NULL || pData == NULL) return NULL; - SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; uint8_t dupMode = SL_DUP_MODE(pSkipList); SSkipListNode *pNode = NULL; tSkipListWLock(pSkipList); - bool hasDup = tSkipListGetPosToPut(pSkipList, forward, pData); + bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) { if (dupMode == SL_UPDATE_DUP_KEY) { - pNode = SL_NODE_GET_FORWARD_POINTER(forward[0], 0); + pNode = SL_NODE_GET_BACKWARD_POINTER(backward[0], 0); atomic_store_ptr(&(pNode->pData), pData); } } else { @@ -126,7 +126,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { if (pNode != NULL) { pNode->pData = pData; - tSkipListDoInsert(pSkipList, forward, pNode); + tSkipListDoInsert(pSkipList, backward, pNode); } } @@ -310,7 +310,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { } } -static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode) { for (int32_t i = 0; i < pNode->level; ++i) { if (i >= pSkipList->level) { SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; @@ -318,14 +318,14 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode; } else { - SSkipListNode *x = forward[i]; - SL_NODE_GET_BACKWARD_POINTER(pNode, i) = x; + SSkipListNode *x = backward[i]; + SL_NODE_GET_FORWARD_POINTER(pNode, i) = x; - SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(x, i); - SL_NODE_GET_BACKWARD_POINTER(next, i) = pNode; + SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(x, i); + SL_NODE_GET_FORWARD_POINTER(prev, i) = pNode; - SL_NODE_GET_FORWARD_POINTER(pNode, i) = next; - SL_NODE_GET_FORWARD_POINTER(x, i) = pNode; + SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode; + SL_NODE_GET_BACKWARD_POINTER(pNode, i) = prev; } } @@ -371,57 +371,57 @@ static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) { return 0; } -static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData) { +static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) { int compare = 0; bool hasDupKey = false; char * pDataKey = pSkipList->keyFn(pData); if (pSkipList->size == 0) { for (int i = 0; i < pSkipList->level; i++) { - forward[i] = pSkipList->pHead; + backward[i] = pSkipList->pTail; } } else { char *pKey = NULL; - // Compare min key - pKey = SL_GET_MIN_KEY(pSkipList); + // Compare max key + pKey = SL_GET_MAX_KEY(pSkipList); compare = pSkipList->comparFn(pDataKey, pKey); - if (compare <= 0) { + if (compare >= 0) { for (int i = 0; i < pSkipList->level; i++) { - forward[i] = pSkipList->pHead; + backward[i] = pSkipList->pTail; } return (compare == 0); } - // Compare max key - pKey = SL_GET_MAX_KEY(pSkipList); + // Compare min key + pKey = SL_GET_MIN_KEY(pSkipList); compare = pSkipList->comparFn(pDataKey, pKey); - if (compare > 0) { + if (compare < 0) { for (int i = 0; i < pSkipList->level; i++) { - forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i); + backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i); } return (compare == 0); } - SSkipListNode *px = pSkipList->pHead; + SSkipListNode *px = pSkipList->pTail; for (int i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i); - while (p != pSkipList->pTail) { + SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i); + while (p != pSkipList->pHead) { pKey = SL_GET_NODE_KEY(pSkipList, p); compare = pSkipList->comparFn(pKey, pDataKey); - if (compare >= 0) { + if (compare <= 0) { if (compare == 0 && !hasDupKey) hasDupKey = true; break; } else { px = p; - p = SL_NODE_GET_FORWARD_POINTER(px, i); + p = SL_NODE_GET_BACKWARD_POINTER(px, i); } } - forward[i] = px; + backward[i] = px; } } diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 099b9d9530cddf946886608fec9543fb7f07b1f5..451976f563740b4ac933766d5c0c8f4075ad42d8 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -377,7 +377,8 @@ int taosCheckVersion(char *input_client_version, char *input_server_version, int for(int32_t i = 0; i < comparedSegments; ++i) { if (clientVersionNumber[i] != serverVersionNumber[i]) { - uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version, version); + uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version, + client_version); return TSDB_CODE_TSC_INVALID_VERSION; } } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index bd44ce8e1cd65761a084686e4281cd237720d762..199619e8514faac9e663914ddf59a3ea0462afde 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -67,10 +67,17 @@ int32_t vnodeInitResources() { return TSDB_CODE_VND_OUT_OF_MEMORY; } + if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) { + vError("failed to init vnode commit queue"); + return terrno; + } + return TSDB_CODE_SUCCESS; } void vnodeCleanupResources() { + tsdbDestroyCommitQueue(); + if (tsVnodesHash != NULL) { vDebug("vnode list is cleanup"); taosHashCleanup(tsVnodesHash); @@ -308,6 +315,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->version = walGetVersion(pVnode->wal); } + tsdbSyncCommit(pVnode->tsdb); + walRemoveAllOldFiles(pVnode->wal); walRenew(pVnode->wal); SSyncInfo syncInfo; @@ -583,6 +592,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { if (status == TSDB_STATUS_COMMIT_OVER) { vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); + walRemoveOneOldFile(pVnode->wal); return vnodeSaveVersion(pVnode); } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 55ddf3a34b42ea82ee169d2f25aeea23b3c0c972..9fa3a11c9c659b44a4edc12110969feb6779a8b7 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -229,7 +229,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (handle == NULL) { // failed to register qhandle pRsp->code = terrno; terrno = 0; - vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, + vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, tstrerror(pRsp->code)); qDestroyQueryInfo(pQInfo); // destroy it directly return pRsp->code; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 99594607ac0a19e31f438e0a02320ee189d55253..e67c544fb23456d8775077e8a86f1745508bd0fc 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -217,6 +217,11 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar if (code != TSDB_CODE_SUCCESS) return code; } + if (pHead->len > TSDB_MAX_WAL_SIZE) { + vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version); + return TSDB_CODE_WAL_SIZE_LIMIT; + } + int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len; SVWriteMsg *pWrite = taosAllocateQitem(size); if (pWrite == NULL) { diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index d1e977225929d8e34b42fbe6aec53c3f41bb7273..36311c8f5d92fe0ba97793224df59d4dd33b5da6 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -34,7 +34,7 @@ extern int32_t wDebugFlag; #define WAL_PREFIX "wal" #define WAL_PREFIX_LEN 3 #define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (1024 * 1024) +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32) diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index de666c85e832aa52ce33d442f543c0512d91adb8..fb49f38217d62b9181f0eed92155434b516ab3e0 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -128,16 +128,7 @@ void walClose(void *handle) { taosClose(pWal->fd); if (pWal->keep != TAOS_WAL_KEEP) { - int64_t fileId = -1; - while (walGetNextFile(pWal, &fileId) >= 0) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - - if (remove(pWal->name) < 0) { - wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); - } else { - wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); - } - } + walRemoveAllOldFiles(pWal); } else { wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 9681f4b898d0661d25650067362be429854e8c94..48021eecfc3523466f1e8e878cfb6b5344c8e9c4 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -58,24 +58,48 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name); } - if (pWal->keep != TAOS_WAL_KEEP) { - // remove the oldest wal file - int64_t oldFileId = -1; - if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) { - char walName[WAL_FILE_LEN] = {0}; - snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId); - - if (remove(walName) < 0) { - wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno)); - } else { - wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, walName); - } + pthread_mutex_unlock(&pWal->mutex); + + return code; +} + +void walRemoveOneOldFile(void *handle) { + SWal *pWal = handle; + if (pWal == NULL) return; + if (pWal->keep == TAOS_WAL_KEEP) return; + + pthread_mutex_lock(&pWal->mutex); + + // remove the oldest wal file + int64_t oldFileId = -1; + if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) { + char walName[WAL_FILE_LEN] = {0}; + snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId); + + if (remove(walName) < 0) { + wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno)); + } else { + wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName); } } pthread_mutex_unlock(&pWal->mutex); +} - return code; +void walRemoveAllOldFiles(void *handle) { + if (handle == NULL) return; + + SWal * pWal = handle; + int64_t fileId = -1; + while (walGetNextFile(pWal, &fileId) >= 0) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); + + if (remove(pWal->name) < 0) { + wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); + } else { + wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); + } + } } int32_t walWrite(void *handle, SWalHead *pHead) { diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index a48dbdc48024ea7348f11b70129ebbf1a486e53d..15cadf38e701488f911345fb01553aeab9f11560 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -154,7 +154,7 @@ python3 ./test.py -f query/queryConnection.py python3 ./test.py -f query/queryCountCSVData.py python3 ./test.py -f query/natualInterval.py python3 ./test.py -f query/bug1471.py -python3 ./test.py -f query/dataLossTest.py +#python3 ./test.py -f query/dataLossTest.py #stream python3 ./test.py -f stream/metric_1.py diff --git a/tests/pytest/tools/insert.json b/tests/pytest/tools/insert.json new file mode 100644 index 0000000000000000000000000000000000000000..c3fa78076b2a25f73ebc50f6a35bcc5afddb246d --- /dev/null +++ b/tests/pytest/tools/insert.json @@ -0,0 +1,50 @@ +{ + "filetype":"insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 1, + "databases": [{ + "dbinfo": { + "name": "db01", + "replica": 1, + "days": 10, + "cache": 16, + "blocks": 8, + "precision": "ms", + "update": 0, + "maxtablesPerVnode": 1000 + }, + "super_tables": [{ + "name": "stb01", + "childtable_count": 100, + "childtable_prefix": "stb01_", + "auto_create_table": "no", + "data_source": "rand", + "insert_mode": "taosc", + "insert_rate": 0, + "insert_rows": 1000, + "timestamp_step": 1000, + "start_timestamp": "2020-10-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "/home/data/sample.csv", + "tags_file": "", + "columns": [{ + "type": "SMALLINT" + }, { + "type": "BOOL" + }, { + "type": "BINARY", + "len": 6 + }], + "tags": [{ + "type": "INT" + },{ + "type": "BINARY", + "len": 4 + }] + }] + }] +} diff --git a/tests/pytest/tools/lowa.py b/tests/pytest/tools/lowa.py new file mode 100644 index 0000000000000000000000000000000000000000..523229dd463d54c5b2cd23a9a3d4d547858a3b5c --- /dev/null +++ b/tests/pytest/tools/lowa.py @@ -0,0 +1,66 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numberOfTables = 10000 + self.numberOfRecords = 100 + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def run(self): + tdSql.prepare() + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath+ "/build/bin/" + os.system("yes | %slowa -f tools/insert.json" % binPath) + + tdSql.execute("use db01") + tdSql.query("select count(*) from stb01") + tdSql.checkData(0, 0, 100000) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/script/general/wal/kill.sim b/tests/script/general/wal/kill.sim new file mode 100644 index 0000000000000000000000000000000000000000..7f103874a561c2bb2534996ab30d30ab0e8907a3 --- /dev/null +++ b/tests/script/general/wal/kill.sim @@ -0,0 +1,77 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +print ============== deploy +system sh/exec.sh -n dnode1 -s start +sleep 3001 +sql connect + +sql create database d1 +sql use d1 + +sql create table t1 (ts timestamp, i int) +sql insert into t1 values(now, 1); + +print =============== step3 +sleep 3000 +sql select * from t1; +print rows: $rows +if $rows != 1 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGKILL +sleep 3000 + +print =============== step4 +system sh/exec.sh -n dnode1 -s start -x SIGKILL +sleep 3000 +sql select * from t1; +print rows: $rows +if $rows != 1 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGKILL +sleep 3000 + +print =============== step5 +system sh/exec.sh -n dnode1 -s start -x SIGKILL +sleep 3000 +sql select * from t1; +print rows: $rows +if $rows != 1 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGKILL +sleep 3000 + +print =============== step6 +system sh/exec.sh -n dnode1 -s start -x SIGKILL +sleep 3000 +sql select * from t1; +print rows: $rows +if $rows != 1 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGKILL +sleep 3000 + +print =============== step7 +system sh/exec.sh -n dnode1 -s start -x SIGKILL +sleep 3000 +sql select * from t1; +print rows: $rows +if $rows != 1 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGKILL +sleep 3000 + +print =============== step8 +system sh/exec.sh -n dnode1 -s start -x SIGKILL +sleep 3000 +sql select * from t1; +print rows: $rows +if $rows != 1 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGKILL diff --git a/tests/script/general/wal/maxtables.sim b/tests/script/general/wal/maxtables.sim new file mode 100644 index 0000000000000000000000000000000000000000..e504c7e92e3447f7d29dbb2dc03456c3775ced2c --- /dev/null +++ b/tests/script/general/wal/maxtables.sim @@ -0,0 +1,46 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100 +system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 1 +system sh/cfg.sh -n dnode1 -c tableIncStepPerVnode -v 2 + + +print ============== deploy +system sh/exec.sh -n dnode1 -s start +sleep 3001 +sql connect + +sql create database d1 +sql use d1 +sql create table st (ts timestamp, tbcol int) TAGS(tgcol int) + +$i = 0 +while $i < 100 + $tb = t . $i + sql create table $tb using st tags( $i ) + sql insert into $tb values (now , $i ) + $i = $i + 1 +endw + +sql_error sql create table tt (ts timestamp, i int) + +print =============== step3 +sql select * from st; +if $rows != 100 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 4 +sleep 3000 + +print =============== step4 +system sh/exec.sh -n dnode1 -s start +sleep 3000 + +sql select * from st; +if $rows != 100 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/general/wal/sync.sim b/tests/script/general/wal/sync.sim new file mode 100644 index 0000000000000000000000000000000000000000..abaf22f91921e5bcc8effbe6fc1c66766ff92a3f --- /dev/null +++ b/tests/script/general/wal/sync.sim @@ -0,0 +1,124 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3 + +system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 + +system sh/cfg.sh -n dnode1 -c http -v 1 +system sh/cfg.sh -n dnode2 -c http -v 1 +system sh/cfg.sh -n dnode3 -c http -v 1 + +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000 +system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000 +system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000 + +system sh/cfg.sh -n dnode1 -c replica -v 3 +system sh/cfg.sh -n dnode2 -c replica -v 3 +system sh/cfg.sh -n dnode3 -c replica -v 3 + +system sh/cfg.sh -n dnode1 -c maxSQLLength -v 940032 +system sh/cfg.sh -n dnode2 -c maxSQLLength -v 940032 +system sh/cfg.sh -n dnode3 -c maxSQLLength -v 940032 + +print ============== deploy + +system sh/exec.sh -n dnode1 -s start +sleep 5001 +sql connect + +sql create dnode $hostname2 +sql create dnode $hostname3 +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start + +print =============== step1 +$x = 0 +show1: + $x = $x + 1 + sleep 2000 + if $x == 5 then + return -1 + endi +sql show mnodes -x show1 +$mnode1Role = $data2_1 +print mnode1Role $mnode1Role +$mnode2Role = $data2_2 +print mnode2Role $mnode2Role +$mnode3Role = $data2_3 +print mnode3Role $mnode3Role + +if $mnode1Role != master then + goto show1 +endi +if $mnode2Role != slave then + goto show1 +endi +if $mnode3Role != slave then + goto show1 +endi + +print =============== step2 +sql create database d1 replica 3 +sql use d1 + +sql create table table_rest (ts timestamp, i int) +print sql length is 870KB +restful d1 table_rest 1591072800 30000 +restful d1 table_rest 1591172800 30000 +restful d1 table_rest 1591272800 30000 +restful d1 table_rest 1591372800 30000 +restful d1 table_rest 1591472800 30000 +restful d1 table_rest 1591572800 30000 +restful d1 table_rest 1591672800 30000 +restful d1 table_rest 1591772800 30000 +restful d1 table_rest 1591872800 30000 +restful d1 table_rest 1591972800 30000 + +sql select * from table_rest; +print rows: $rows +if $rows != 300000 then + return -1 +endi + +print =============== step3 +system sh/exec.sh -n dnode1 -s stop -x SIGINT +sleep 5000 +sql select * from table_rest; +print rows: $rows +if $rows != 300000 then + return -1 +endi +system sh/exec.sh -n dnode1 -s start -x SIGINT +sleep 5000 + +print =============== step4 +system sh/exec.sh -n dnode2 -s stop -x SIGINT +sleep 5000 +sql select * from table_rest; +print rows: $rows +if $rows != 300000 then + return -1 +endi +system sh/exec.sh -n dnode2 -s start -x SIGINT +sleep 5000 + +print =============== step5 +system sh/exec.sh -n dnode3 -s stop -x SIGINT +sleep 5000 +sql select * from table_rest; +print rows: $rows +if $rows != 300000 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1b2fe37c71f26486ba847006ecf3aa373b5f55c1..daf92679bd61d8317b648b0285e46a4c956a9d94 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -236,6 +236,10 @@ cd ../../../debug; make ./test.sh -f general/vector/table_query.sim ./test.sh -f general/vector/table_time.sim +./test.sh -f general/wal/sync.sim +./test.sh -f general/wal/kill.sim +./test.sh -f general/wal/maxtables.sim + ./test.sh -f unique/account/account_create.sim ./test.sh -f unique/account/account_delete.sim ./test.sh -f unique/account/account_len.sim