diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 6cdff79629b2948111584e93b33336144c6fd214..7a1241337732cd0faa72ed59b4ad5583d3d79632 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG 88d26c3 + GIT_TAG 766dcc4 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index f90a163cb4bf9283aa99ded308aaa7a5037501fb..e71598ae5a1255c68c3945ac27c9158390e6e022 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 58f58ee + GIT_TAG c9cc20f SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index c50e73144f115b4bc4050d29b56f38958f3fe8dc..ad4c150505b8a19ad7185607bc42a76510b06594 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git - GIT_TAG 24b199e + GIT_TAG 9fa7e2f SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/14-reference/06-taosdump.md b/docs/en/14-reference/06-taosdump.md index 96e68d0edbf9a2f6880cc557580d3dfb20def947..2105ba83fad9700674e28609016b07ef6de66833 100644 --- a/docs/en/14-reference/06-taosdump.md +++ b/docs/en/14-reference/06-taosdump.md @@ -29,7 +29,7 @@ There are two ways to install taosdump: 1. backing up all databases: specify `-A` or `-all-databases` parameter. 2. backup multiple specified databases: use `-D db1,db2,... ` parameters; -3. back up some super or normal tables in the specified database: use `-dbname stbname1 stbname2 tbname1 tbname2 ... ` parameters. Note that the first parameter of this input sequence is the database name, and only one database is supported. The second and subsequent parameters are the names of super or normal tables in that database, separated by spaces. +3. back up some super or normal tables in the specified database: use `dbname stbname1 stbname2 tbname1 tbname2 ... ` parameters. Note that the first parameter of this input sequence is the database name, and only one database is supported. The second and subsequent parameters are the names of super or normal tables in that database, separated by spaces. 4. back up the system log database: TDengine clusters usually contain a system database named `log`. The data in this database is the data that TDengine runs itself, and the taosdump will not back up the log database by default. If users need to back up the log database, users can use the `-a` or `-allow-sys` command-line parameter. 5. Loose mode backup: taosdump version 1.4.1 onwards provides `-n` and `-L` parameters for backing up data without using escape characters and "loose" mode, which can reduce the number of backups if table names, column names, tag names do not use escape characters. This can also reduce the backup data time and backup data footprint. If you are unsure about using `-n` and `-L` conditions, please use the default parameters for "strict" mode backup. See the [official documentation](/taos-sql/escape) for a description of escaped characters. @@ -104,7 +104,10 @@ Usage: taosdump [OPTION...] dbname [tbname ...] use letter and number only. Default is NOT. -n, --no-escape No escape char '`'. Default is using it. -T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is - 5. + 8. + -C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service + -R, --restful Use RESTful interface to connect TDengine + -t, --timeout=SECONDS The timeout seconds for websocket to interact. -g, --debug Print debug info. -?, --help Give this help list --usage Give a short usage message diff --git a/docs/en/20-third-party/09-emq-broker.md b/docs/en/20-third-party/09-emq-broker.md index 8dfa09e6c7d1615ea3a1145428efe920588b687e..0900dd3d7571dc0ab8d93174aa2d7b5eccf1fbf5 100644 --- a/docs/en/20-third-party/09-emq-broker.md +++ b/docs/en/20-third-party/09-emq-broker.md @@ -16,6 +16,7 @@ The following preparations are required for EMQX to add TDengine data sources co Depending on the current operating system, users can download the installation package from the [EMQX official website](https://www.emqx.io/downloads) and execute the installation. After installation, use `sudo emqx start` or `sudo systemctl start emqx` to start the EMQX service. +Note: this chapter is based on EMQX v4.4.5. Other version of EMQX probably change its user interface, configuration methods or functions. ## Create Database and Table @@ -31,7 +32,7 @@ Note: The table schema is based on the blog [(In Chinese) Data Transfer, Storage ## Configuring EMQX Rules -Since the configuration interface of EMQX differs from version to version, here is v4.4.3 as an example. For other versions, please refer to the corresponding official documentation. +Since the configuration interface of EMQX differs from version to version, here is v4.4.5 as an example. For other versions, please refer to the corresponding official documentation. ### Login EMQX Dashboard diff --git a/docs/zh/14-reference/06-taosdump.md b/docs/zh/14-reference/06-taosdump.md index 95ee20bfbae3f0f57c51b8fcd3bd9b35b4764f3d..625499a94926ac3f86e4d34976c70a0bfe7b0954 100644 --- a/docs/zh/14-reference/06-taosdump.md +++ b/docs/zh/14-reference/06-taosdump.md @@ -107,7 +107,10 @@ Usage: taosdump [OPTION...] dbname [tbname ...] use letter and number only. Default is NOT. -n, --no-escape No escape char '`'. Default is using it. -T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is - 5. + 8. + -C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service + -R, --restful Use RESTful interface to connect TDengine + -t, --timeout=SECONDS The timeout seconds for websocket to interact. -g, --debug Print debug info. -?, --help Give this help list --usage Give a short usage message diff --git a/docs/zh/20-third-party/09-emq-broker.md b/docs/zh/20-third-party/09-emq-broker.md index 84b1027f6b080cc5d5f8ac98f9aa6ea8be428f28..dd98374558080a0ea11cbc22ede58b66a3984191 100644 --- a/docs/zh/20-third-party/09-emq-broker.md +++ b/docs/zh/20-third-party/09-emq-broker.md @@ -17,6 +17,7 @@ MQTT 是流行的物联网数据传输协议,[EMQX](https://github.com/emqx/em 用户可以根据当前的操作系统,到 EMQX 官网下载安装包,并执行安装。下载地址如下:。安装后使用 `sudo emqx start` 或 `sudo systemctl start emqx` 启动 EMQX 服务。 +注意:本文基于 EMQX v4.4.5 版本,其他版本由于相关配置界面、配置方法以及功能可能随着版本升级有所区别。 ## 创建数据库和表 @@ -32,7 +33,7 @@ CREATE TABLE sensor_data (ts TIMESTAMP, temperature FLOAT, humidity FLOAT, volum ## 配置 EMQX 规则 -由于 EMQX 不同版本配置界面所有不同,这里仅以 v4.4.3 为例,其他版本请参考相应官网文档。 +由于 EMQX 不同版本配置界面所有不同,这里仅以 v4.4.5 为例,其他版本请参考相应官网文档。 ### 登录 EMQX Dashboard diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dcd35f10f7249aea8f3afdaa9fe27d9b2d899c0a..bfb80ec8f845ccf11b0600da93305c3c65b616c0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -227,8 +227,7 @@ typedef struct SSubmitBlk { int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t schemaLen; // schema length, if length is 0, no schema exists - int16_t numOfRows; // total number of rows in current submit block - int16_t padding; // TODO just for padding here + int32_t numOfRows; // total number of rows in current submit block char data[]; } SSubmitBlk; @@ -256,7 +255,7 @@ typedef struct { int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t schemaLen; // schema length, if length is 0, no schema exists - int16_t numOfRows; // total number of rows in current submit block + int32_t numOfRows; // total number of rows in current submit block // head of SSubmitBlk int32_t numOfBlocks; const void* pMsg; diff --git a/include/os/osEnv.h b/include/os/osEnv.h index a3f92a0b2914dca8afe5e63e9c121694039d52ac..798bfc197ef13bf95b3246fedbb27dafd881c2dd 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -49,6 +49,8 @@ void osDefaultInit(); void osUpdate(); void osCleanup(); bool osLogSpaceAvailable(); +bool osDataSpaceAvailable(); +bool osTempSpaceAvailable(); void osSetTimezone(const char *timezone); void osSetSystemLocale(const char *inLocale, const char *inCharSet); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c2d621b3114d9ce38971f7dc58f88a16e3062941..6785390952817abfafb45be900bbf20bca86eb76 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -76,7 +76,7 @@ void taos_cleanup(void) { cleanupTaskQueue(); taosConvDestroy(); - + tscInfo("all local resources released"); taosCleanupCfg(); taosCloseLog(); @@ -680,7 +680,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); pRequest->stableQuery = pQuery->stableQuery; if (pQuery->pRoot) { - pRequest->stmtType = pQuery->pRoot->type; + pRequest->stmtType = pQuery->pRoot->type; } } @@ -785,9 +785,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { STscObj *pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; if (NULL == pQuery->pRoot) { - atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); + atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) { - atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1); + atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1); } } @@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); + pCxt = NULL; if (code == TSDB_CODE_SUCCESS) { return; } @@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { _error: tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); + taosMemoryFree(pCxt); + terrno = code; pRequest->code = code; pRequest->body.queryFp(pRequest->body.param, pRequest, code); @@ -857,7 +860,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { STscObj *pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); + atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); } pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index db33532aef36d7b4f3fc7f67502a92728b44ea57..4217cf08b325b7a32c3227b8dcbabcc19b2968f2 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -30,6 +30,10 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; setErrno(pRequest, code); + if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { + removeMeta(pRequest->pTscObj, pRequest->targetTableList); + } + taosMemoryFree(pMsg->pData); if (pRequest->body.queryFp != NULL) { pRequest->body.queryFp(pRequest->body.param, pRequest, code); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 994cf9fdc17e6f828afd68387e04a38cdb8437d0..c68c3fad955e5f5e3b64651c2eb044a0192af1e1 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -3144,10 +3144,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) blk->uid = htobe64(uid); blk->suid = htobe64(suid); - blk->padding = htonl(blk->padding); blk->sversion = htonl(pTableMeta->sversion); blk->schemaLen = htonl(schemaLen); - blk->numOfRows = htons(rows); + blk->numOfRows = htonl(rows); blk->dataLen = htonl(dataLen); subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen; subReq->numOfBlocks = 1; @@ -3373,10 +3372,9 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) { blk->uid = htobe64(uid); blk->suid = htobe64(suid); - blk->padding = htonl(blk->padding); blk->sversion = htonl(pSW->version); blk->schemaLen = htonl(schemaLen); - blk->numOfRows = htons(rows); + blk->numOfRows = htonl(rows); blk->dataLen = htonl(dataLen); subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen; subReq->numOfBlocks++; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8a240018fac69eb19400c509383bc43dedd5bfac..51c21eafa9d2bae9d84b5c2f2c88559f7f1a0383 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2028,11 +2028,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks int32_t dataLen = blk->dataLen; blk->uid = htobe64(blk->uid); blk->suid = htobe64(blk->suid); - blk->padding = htonl(blk->padding); blk->sversion = htonl(blk->sversion); blk->dataLen = htonl(blk->dataLen); blk->schemaLen = htonl(blk->schemaLen); - blk->numOfRows = htons(blk->numOfRows); + blk->numOfRows = htonl(blk->numOfRows); blk = (SSubmitBlk*)(blk->data + dataLen); } } else { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index efadcad350f0c869870bb0429b0e531a2d13f0da..8dd8a29c865163e86e1004667c77a722b2201fd7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -452,7 +452,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "logDir"); tstrncpy(tsLogDir, cfgGetItem(pCfg, "logDir")->str, PATH_MAX); taosExpandDir(tsLogDir, tsLogDir, PATH_MAX); - tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval; + tsLogSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalLogDirGB")->fval) * 1024 * 1024 * 1024); tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32; tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval; tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32; @@ -502,7 +502,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX); taosExpandDir(tsTempDir, tsTempDir, PATH_MAX); - tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval; + tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024); if (taosMulMkDir(tsTempDir) != 0) { uError("failed to create tempDir:%s since %s", tsTempDir, terrstr()); return -1; @@ -540,7 +540,7 @@ static void taosSetSystemCfg(SConfig *pCfg) { } static int32_t taosSetServerCfg(SConfig *pCfg) { - tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; + tsDataSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalDataDirGB")->fval) * 1024 * 1024 * 1024); tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32; tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32; tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32; @@ -739,15 +739,15 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { } case 'i': { if (strcasecmp("minimalTmpDirGB", name) == 0) { - tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval; + tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024); } else if (strcasecmp("minimalDataDirGB", name) == 0) { - tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; + tsDataSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalDataDirGB")->fval) * 1024 * 1024 * 1024); } else if (strcasecmp("minSlidingTime", name) == 0) { tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32; } else if (strcasecmp("minIntervalTime", name) == 0) { tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32; } else if (strcasecmp("minimalLogDirGB", name) == 0) { - tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval; + tsLogSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalLogDirGB")->fval) * 1024 * 1024 * 1024); } break; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 05b27546eb016cb684eaacac4afd9853d668a1d9..fb9c64a880393198c62a233625812186b09240ae 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -76,7 +76,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { pIter->sversion = htonl((*pPBlock)->sversion); pIter->dataLen = htonl((*pPBlock)->dataLen); pIter->schemaLen = htonl((*pPBlock)->schemaLen); - pIter->numOfRows = htons((*pPBlock)->numOfRows); + pIter->numOfRows = htonl((*pPBlock)->numOfRows); } return 0; } diff --git a/source/common/src/ttszip.c b/source/common/src/ttszip.c index 03353b0de633c32cfbd5ff89ce800617d9603f57..c86bf08e8168f00f882333878e11af90a0e65627 100644 --- a/source/common/src/ttszip.c +++ b/source/common/src/ttszip.c @@ -30,6 +30,12 @@ static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); * @return */ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_TSC_NO_DISKSPACE; + // tscError("tmp file created failed since %s", terrstr()); + return NULL; + } + STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf)); if (pTSBuf == NULL) { return NULL; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e4d6de849cd2e64b24645b00ee9a8d030421864a..1a226abe5c8487b463a1b39fcaa4e5f4f45ff30a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -176,7 +176,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp taosWriteQitem(pVnode->pFetchQ, pMsg); break; case WRITE_QUEUE: - if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { + if (!osDataSpaceAvailable()) { + terrno = TSDB_CODE_VND_NO_DISKSPACE; + code = terrno; + dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); + } else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { terrno = TSDB_CODE_VND_NO_WRITE_AUTH; code = terrno; dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 119d5218271d143f5a9ab85a1b3b740c7d65190c..5d72ce3b18b671b9a3581b7947a18670f5d3f762 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -49,8 +49,26 @@ static int32_t dmInitMonitor() { return 0; } +static bool dmCheckDiskSpace() { + osUpdate(); + if (!osDataSpaceAvailable()) { + dError("free disk size: %f GB, too little, require %f GB at least at least , quit", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0); + return false; + } + if (!osLogSpaceAvailable()) { + dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0); + return false; + } + if (!osTempSpaceAvailable()) { + dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0); + return false; + } + return true; +} + int32_t dmInit(int8_t rtype) { dInfo("start to init dnode env"); + if (!dmCheckDiskSpace()) return -1; if (dmCheckRepeatInit(dmInstance()) != 0) return -1; if (dmInitSystem() != 0) return -1; if (dmInitMonitor() != 0) return -1; diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index ecbb695e02edaef6a2546a608d0dedbee61afc91..99ffd73a7a71c8fb9ab1ff43cc5ecc008d23eecd 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -265,6 +265,7 @@ static void dmWatchNodes(SDnode *pDnode) { } int32_t dmRunDnode(SDnode *pDnode) { + int count = 0; if (dmOpenNodes(pDnode) != 0) { dError("failed to open nodes since %s", terrstr()); return -1; @@ -274,7 +275,6 @@ int32_t dmRunDnode(SDnode *pDnode) { dError("failed to start nodes since %s", terrstr()); return -1; } - while (1) { if (pDnode->stop) { dInfo("TDengine is about to stop"); @@ -285,6 +285,9 @@ int32_t dmRunDnode(SDnode *pDnode) { } dmWatchNodes(pDnode); + if (count == 0) osUpdate(); + count %= 10; + count++; taosMsleep(100); } } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 0fea4007254ff5bb0f16d0e3c4f5760a69f74990..3e747b66c820828fd2f086318794ef0d3e2c88b4 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -531,7 +531,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { } if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfColumns > TSDB_MAX_COLUMNS) { - terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + terrno = TSDB_CODE_PAR_INVALID_COLUMNS_NUM; return -1; } @@ -542,7 +542,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { SField *pField = taosArrayGet(pCreate->pColumns, 0); if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) { - terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + terrno = TSDB_CODE_PAR_INVALID_FIRST_COLUMN; return -1; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5e8867fdfbfe2732f22aa6a5caa2987750637373..b8501db9fb2a372e9a8ad7a919a904c9296fdea9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -437,7 +437,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre pField->bytes = 8; if (mndCheckCreateStbReq(&createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 810dcb904964ded49b9deb961458ac237ce5b071..b8ff7be46a8098fc3bc88eab0fcaea6884b507b2 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -149,7 +149,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply, - pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex); + pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); SMnode *pMnode = pFsm->data; return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 850394b15d934ea477c4fe719c601e1a30ebea7c..6b4d5a2f3ebb4f272180c3165ddd16c055b39364 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -117,7 +117,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - blkHead->numOfRows = htons(pDataBlock->info.rows); + blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->sversion = htonl(pTSchema->version); // TODO blkHead->suid = htobe64(suid); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index a221bc17957b36a43e3847a33346ae1b05f8794c..383652531e211504983444d9d783ddf9189f5161 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -111,7 +111,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { // pBlock->sversion = htonl(pBlock->sversion); // pBlock->dataLen = htonl(pBlock->dataLen); // pBlock->schemaLen = htonl(pBlock->schemaLen); - // pBlock->numOfRows = htons(pBlock->numOfRows); + // pBlock->numOfRows = htonl(pBlock->numOfRows); #if 0 if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f3550bf2eed2a637f2654708f37f6bfa29ea4d21..52698d0be7c25ee9367d0bcecf3168c9f64c4909 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply, - pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex); + pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 2a8c7a583ccb4f5aa720f2fd83db85a19f76b93b..33623f1bdd206a2bf3871650c591efbbd96cd3c4 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -395,9 +395,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { pBlk->uid = htobe64(tbUid); pBlk->suid = htobe64(tbUid); pBlk->sversion = htonl(schemaVer); - pBlk->padding = htonl(0); pBlk->schemaLen = htonl(0); - pBlk->numOfRows = htons(mockRowNum); + pBlk->numOfRows = htonl(mockRowNum); pBlk->dataLen = htonl(mockRowNum * mockRowLen); for (uint32_t r = 0; r < mockRowNum; ++r) { pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 0f97b5c5b196d4a5c6ef2ad48860cacfa88dccf6..4652c66ebc4df6dbe35b69afc12b8b067c113759 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -13,24 +13,25 @@ * along with this program. If not, see . */ -#include "trpc.h" -#include "query.h" -#include "tname.h" #include "catalogInt.h" +#include "query.h" #include "systable.h" +#include "tname.h" #include "tref.h" +#include "trpc.h" -int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) { - int32_t code = 0; - SArray* pTaskId = cbParam->taskId; +int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) { + int32_t code = 0; + SArray* pTaskId = cbParam->taskId; SCatalog* pCtg = pJob->pCtg; - int32_t taskNum = taosArrayGetSize(pTaskId); - SDataBuf taskMsg = *pMsg; - int32_t offset = 0; + int32_t taskNum = taosArrayGetSize(pTaskId); + SDataBuf taskMsg = *pMsg; + int32_t offset = 0; int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; ASSERT(taskNum == msgNum || 0 == msgNum); - ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1)); + ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, + TMSG_INFO(cbParam->reqType + 1)); offset += sizeof(msgNum); SBatchRsp rsp = {0}; @@ -39,10 +40,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ctgError("taosHashInit %d batch failed", taskNum); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + for (int32_t i = 0; i < taskNum; ++i) { - int32_t* taskId = taosArrayGet(pTaskId, i); - SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); + int32_t* taskId = taosArrayGet(pTaskId, i); + SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId); if (msgNum > 0) { rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); offset += sizeof(rsp.reqType); @@ -52,7 +53,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu offset += sizeof(rsp.rspCode); rsp.msg = ((char*)pMsg->pData) + offset; offset += rsp.msgLen; - + taskMsg.msgType = rsp.reqType; taskMsg.pData = rsp.msg; taskMsg.len = rsp.msgLen; @@ -64,9 +65,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu } pTask->pBatchs = pBatchs; - - ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1)); - + + ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, + TMSG_INFO(taskMsg.msgType + 1)); + (*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); } @@ -78,23 +80,22 @@ _return: CTG_RET(code); } - int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) { int32_t code = 0; - + switch (reqType) { case TDMT_MND_QNODE_LIST: { if (TSDB_CODE_SUCCESS != rspCode) { qError("error rsp for qnode list, error:%s", tstrerror(rspCode)); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode)); CTG_ERR_RET(code); } - + qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out)); break; } @@ -103,13 +104,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for dnode list, error:%s", tstrerror(rspCode)); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode)); CTG_ERR_RET(code); } - + qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out)); break; } @@ -118,13 +119,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got db vgInfo from mnode, dbFName:%s", target); break; } @@ -133,13 +134,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got db cfg from mnode, dbFName:%s", target); break; } @@ -148,13 +149,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got index from mnode, indexName:%s", target); break; } @@ -163,13 +164,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got table index from mnode, tbFName:%s", target); break; } @@ -178,13 +179,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got udf from mnode, funcName:%s", target); break; } @@ -193,13 +194,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got user auth from mnode, user:%s", target); break; } @@ -210,17 +211,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("stablemeta not exist in mnode, tbFName:%s", target); return TSDB_CODE_SUCCESS; } - + qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got table meta from mnode, tbFName:%s", target); break; } @@ -231,17 +232,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("tablemeta not exist in vnode, tbFName:%s", target); return TSDB_CODE_SUCCESS; } - + qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got table meta from vnode, tbFName:%s", target); break; } @@ -250,13 +251,13 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got table cfg from vnode, tbFName:%s", target); break; } @@ -265,28 +266,28 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target); CTG_ERR_RET(code); } - + qDebug("Got stb cfg from mnode, tbFName:%s", target); break; - } + } case TDMT_MND_SERVER_VERSION: { if (TSDB_CODE_SUCCESS != rspCode) { qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode)); CTG_ERR_RET(rspCode); } - + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); if (code) { qError("Process svr ver rsp failed, error:%s", tstrerror(code)); CTG_ERR_RET(code); } - + qDebug("Got svr ver from mnode"); break; } @@ -295,7 +296,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qError("Got error rsp, error:%s", tstrerror(rspCode)); CTG_ERR_RET(rspCode); } - + qError("invalid req type %s", TMSG_INFO(reqType)); return TSDB_CODE_APP_ERROR; } @@ -303,12 +304,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, return TSDB_CODE_SUCCESS; } - -int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { +int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param; - int32_t code = 0; - SCtgJob* pJob = NULL; - + int32_t code = 0; + SCtgJob* pJob = NULL; + CTG_API_JENTER(); pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId); @@ -322,13 +322,15 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) { CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode)); } else { - int32_t *taskId = taosArrayGet(cbParam->taskId, 0); - SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId); + int32_t* taskId = taosArrayGet(cbParam->taskId, 0); + SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId); - qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1)); + qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, + TMSG_INFO(cbParam->reqType + 1)); #if CTG_BATCH_FETCH - SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + SHashObj* pBatchs = + taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == pBatchs) { ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -339,10 +341,10 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); #if CTG_BATCH_FETCH - CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); -#endif + CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); +#endif } - + _return: taosMemoryFree(pMsg->pData); @@ -354,16 +356,16 @@ _return: CTG_API_LEAVE(code); } - -int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { +int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, + SMsgSendInfo** pMsgSendInfo) { int32_t code = 0; - SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == msgSendInfo) { qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam)); + SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam)); if (NULL == param) { qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam)); CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -391,10 +393,10 @@ _return: CTG_RET(code); } -int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId, - int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) { - int32_t code = 0; - SMsgSendInfo *pMsgSendInfo = NULL; +int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId, + char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) { + int32_t code = 0; + SMsgSendInfo* pMsgSendInfo = NULL; CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo)); ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId); @@ -426,22 +428,23 @@ _return: CTG_RET(code); } -int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) { - int32_t code = 0; - SHashObj* pBatchs = pTask->pBatchs; - SCtgJob* pJob = pTask->pJob; +int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTask* pTask, int32_t msgType, void* msg, + uint32_t msgSize) { + int32_t code = 0; + SHashObj* pBatchs = pTask->pBatchs; + SCtgJob* pJob = pTask->pJob; SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); - int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks); - SCtgBatch newBatch = {0}; - SBatchMsg req = {0}; - + int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks); + SCtgBatch newBatch = {0}; + SBatchMsg req = {0}; + if (NULL == pBatch) { newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg)); newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t)); if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - + newBatch.conn = *pConn; req.msgType = msgType; @@ -475,7 +478,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId); + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, + vgId); return TSDB_CODE_SUCCESS; } @@ -504,7 +508,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT } } - ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId); + ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, + vgId); return TSDB_CODE_SUCCESS; @@ -512,24 +517,24 @@ _return: ctgFreeBatch(&newBatch); taosMemoryFree(msg); - + return code; } int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { - *msg = taosMemoryMalloc(pBatch->msgSize); + *msg = taosMemoryCalloc(1, pBatch->msgSize); if (NULL == (*msg)) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - int32_t offset = 0; - int32_t num = taosArrayGetSize(pBatch->pMsgs); - SBatchReq *pBatchReq = (SBatchReq*)(*msg); + int32_t offset = 0; + int32_t num = taosArrayGetSize(pBatch->pMsgs); + SBatchReq* pBatchReq = (SBatchReq*)(*msg); pBatchReq->header.vgId = htonl(vgId); pBatchReq->msgNum = htonl(num); offset += sizeof(SBatchReq); - + for (int32_t i = 0; i < num; ++i) { SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); @@ -547,23 +552,23 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { return TSDB_CODE_SUCCESS; } -int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) { +int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) { int32_t code = 0; - void* msg = NULL; - void* p = taosHashIterate(pBatchs, NULL); + void* msg = NULL; + void* p = taosHashIterate(pBatchs, NULL); while (NULL != p) { - size_t len = 0; - int32_t* vgId = taosHashGetKey(p, &len); + size_t len = 0; + int32_t* vgId = taosHashGetKey(p, &len); SCtgBatch* pBatch = (SCtgBatch*)p; ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); - + CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); - code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, - pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); + code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId, + pBatch->msgType, msg, pBatch->msgSize); pBatch->pTaskIds = NULL; CTG_ERR_JRET(code); - + p = taosHashIterate(pBatchs, p); } @@ -575,16 +580,15 @@ _return: taosHashCancelIterate(pBatchs, p); } taosMemoryFree(msg); - + CTG_RET(code); } - -int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_QNODE_LIST; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); @@ -609,14 +613,14 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -630,11 +634,11 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray return TSDB_CODE_SUCCESS; } -int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_DNODE_LIST; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); @@ -655,14 +659,14 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -676,12 +680,12 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray return TSDB_CODE_SUCCESS; } - -int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_USE_DB; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); @@ -706,14 +710,14 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -721,21 +725,22 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db)); - + rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } -int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_DB_CFG; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName); CTG_ERR_RET(code); @@ -756,14 +761,14 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = TDMT_MND_GET_DB_CFG, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -777,15 +782,16 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char return TSDB_CODE_SUCCESS; } -int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_INDEX; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get index from mnode, indexName:%s", indexName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get index msg failed, code:%x, db:%s", code, indexName); CTG_ERR_RET(code); @@ -800,20 +806,20 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -823,21 +829,22 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } -int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_TABLE_INDEX; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(name, tbFName); ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); CTG_ERR_RET(code); @@ -848,25 +855,25 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n if (NULL == pOut) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -876,19 +883,20 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } -int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_RETRIEVE_FUNC; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get udf info from mnode, funcName:%s", funcName); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName); CTG_ERR_RET(code); @@ -909,14 +917,14 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -930,15 +938,16 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch return TSDB_CODE_SUCCESS; } -int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_USER_AUTH; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get user auth from mnode, user:%s", user); - int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen, mallocFp); + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp); if (code) { ctgError("Build get user auth msg failed, code:%x, db:%s", code, user); CTG_ERR_RET(code); @@ -953,20 +962,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -976,20 +985,20 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } - -int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) { +int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, + STableMetaOutput* out, SCtgTask* pTask) { SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; - char *msg = NULL; - SEpSet *pVnodeEpSet = NULL; - int32_t msgLen = 0; - int32_t reqType = TDMT_MND_TABLE_META; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char* msg = NULL; + SEpSet* pVnodeEpSet = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_MND_TABLE_META; + char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, tbName); - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName); @@ -1007,26 +1016,26 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; SRpcMsg rpcRsp = {0}; rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); - + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); rpcFreeCont(rpcRsp.pCont); @@ -1034,27 +1043,30 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char return TSDB_CODE_SUCCESS; } -int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) { +int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out, + SCtgTask* pTask) { char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); - return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char *)pTableName->tname, out, pTask); + return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, pTask); } -int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) { +int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo, + STableMetaOutput* out, SCtgTask* pTask) { char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); int32_t reqType = TDMT_VND_TABLE_META; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, pTableName->tname); - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; - ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", - vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); + ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId, + vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); - SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)}; - char *msg = NULL; + SBuildTableInput bInput = { + .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)}; + char* msg = NULL; int32_t msgLen = 0; int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); @@ -1070,16 +1082,16 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); - SRequestConnInfo vConn = {.pTrans = pConn->pTrans, - .requestId = pConn->requestId, - .requestObjRefId = pConn->requestObjRefId, - .mgmtEps = vgroupInfo->epSet}; + SRequestConnInfo vConn = {.pTrans = pConn->pTrans, + .requestId = pConn->requestId, + .requestObjRefId = pConn->requestObjRefId, + .mgmtEps = vgroupInfo->epSet}; #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); -#else +#else SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; - char dbFName[TSDB_DB_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(ctx->pName, dbFName); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { @@ -1087,40 +1099,41 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa } taosArrayPush(pTaskId, &pTask->taskId); - CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen)); -#endif + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen)); +#endif } SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; SRpcMsg rpcRsp = {0}; rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp); - CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); rpcFreeCont(rpcRsp.pCont); return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, + SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_VND_TABLE_CFG; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFName); - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; - ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", - vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); + ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId, + vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp); if (code) { @@ -1131,29 +1144,29 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S if (pTask) { CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); - SRequestConnInfo vConn = {.pTrans = pConn->pTrans, - .requestId = pConn->requestId, - .requestObjRefId = pConn->requestObjRefId, - .mgmtEps = vgroupInfo->epSet}; + SRequestConnInfo vConn = {.pTrans = pConn->pTrans, + .requestId = pConn->requestId, + .requestObjRefId = pConn->requestObjRefId, + .mgmtEps = vgroupInfo->epSet}; #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); #else SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; - char dbFName[TSDB_DB_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(ctx->pName, dbFName); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); -#endif +#endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -1163,18 +1176,18 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } - -int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out, + SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_TABLE_CFG; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFName); - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; @@ -1191,20 +1204,20 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName)); #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -1214,15 +1227,15 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } -int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) { - char *msg = NULL; +int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) { + char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_SERVER_VERSION; - void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; qDebug("try to get svr ver from mnode"); @@ -1237,20 +1250,20 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou #if CTG_BATCH_FETCH CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); -#else +#else SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); if (NULL == pTaskId) { CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pTaskId, &pTask->taskId); - + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); #endif } - + SRpcMsg rpcMsg = { .msgType = reqType, - .pCont = msg, + .pCont = msg, .contLen = msgLen, }; @@ -1260,8 +1273,6 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL)); rpcFreeCont(rpcRsp.pCont); - + return TSDB_CODE_SUCCESS; } - - diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 1c08fafaa3c5f2b9e4fd12fb68c0d5a5f343a4c7..b30fce49886006fc608b576a550cdfcf4fc9d9bd 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -225,7 +225,7 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) { } blkHead->dataLen = htonl(dataLen); - blkHead->numOfRows = htons(rows); + blkHead->numOfRows = htonl(rows); ret->length += sizeof(SSubmitBlk) + dataLen; blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c0afd7adac39a077e8b6dc91dc05ce513ea7f384..01670efbb1ed75757595c768bc7d197c766a1413 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3398,7 +3398,12 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n uint32_t defaultBufsz = 0; getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); - int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH); + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + qError("Init stream agg supporter failed since %s", terrstr(terrno)); + return terrno; + } + int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4662,7 +4667,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF if (bufSize <= pageSize) { bufSize = pageSize * 4; } - int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH); + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + qError("Init stream agg supporter failed since %s", terrstr(terrno)); + return terrno; + } + int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].pBuf = pSup->pResultBuf; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c1e5252089c7a03e4b56cb487e0a926431e1296d..938134c1679c985842f00244a8752aea54e13b87 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -764,7 +764,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition uint32_t defaultBufsz = 0; getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz); - int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH); + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + pTaskInfo->code = terrno; + qError("Create partition operator info failed since %s", terrstr(terrno)); + goto _error; + } + int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 00a9f3ae6c8ff087a9031c7c0d70e81bb3c88504..ad97d79f7e7ad28da3cf51aab33010303e11f509 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -247,7 +247,12 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ return NULL; } - int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, TD_TMP_DIR_PATH); + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + printf("tHash Init failed since %s", terrstr(terrno)); + return NULL; + } + int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, tsTempDir); if (code != 0) { terrno = code; return NULL; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 4f525441b96b6169219dc66a29ff58fc377787f2..48af951773814d9979eb6d349670753ad4b036eb 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -159,7 +159,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { int32_t start = 0; if (pHandle->pBuf == NULL) { - int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", TD_TMP_DIR_PATH); + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + qError("Add to buf failed since %s", terrstr(terrno)); + return terrno; + } + int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", tsTempDir); dBufSetPrintInfo(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { return code; @@ -233,7 +238,13 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int } else { // multi-pass internal merge sort is required if (pHandle->pBuf == NULL) { - code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", TD_TMP_DIR_PATH); + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + code = terrno; + qError("Sort compare init failed since %s", terrstr(terrno)); + return code; + } + code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", tsTempDir); dBufSetPrintInfo(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index ae0765140252b672166f2fd2a721f2655481dbea..19c834955165de3e489d12e1bf1a91914fffa392 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1461,15 +1461,14 @@ static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) } SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, i); - int32_t ret = convertStringToTimestamp(paraType, pValue->datum.p, dbPrec, &timeVal[i - 1]); + int32_t ret = convertStringToTimestamp(paraType, pValue->datum.p, dbPrec, &timeVal[i - 1]); if (ret != TSDB_CODE_SUCCESS) { return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); } } if (timeVal[0] > timeVal[1]) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "INTERP function invalid time range"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "INTERP function invalid time range"); } } @@ -2136,7 +2135,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "apercentile", .type = FUNCTION_TYPE_APERCENTILE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateApercentile, .getEnvFunc = getApercentileFuncEnv, .initFunc = apercentileFunctionSetup, diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 057d2bc7dc0c87b72e07780f440ced86870c9432..517253dc01691754425bd93c40bfef2a2750eed5 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "taoserror.h" #include "tglobal.h" #include "tcompare.h" @@ -257,7 +258,14 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, resetSlotInfo(pBucket); - int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", TD_TMP_DIR_PATH); + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + // qError("MemBucket create disk based Buf failed since %s", terrstr(terrno)); + tMemBucketDestroy(pBucket); + return NULL; + } + + int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir); if (ret != 0) { tMemBucketDestroy(pBucket); return NULL; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 24749729c07e9e5edb57ff5ed8e2ca5bfe882f0e..1cbc78df48b1cbeb5d1645dcd945168f21d25ba6 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -412,22 +412,31 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + msgInfo->code = terrno; + fnError("udfd create shared library failed since %s", terrstr(terrno)); + goto _return; + } + char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name); + snprintf(path, sizeof(path), "%s%s.dll", tsTempDir, pFuncInfo->name); #else - snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name); + snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name); #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); if (file == NULL) { fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); msgInfo->code = TSDB_CODE_FILE_CORRUPTED; + goto _return; } int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); if (count != pFuncInfo->codeSize) { fnError("udfd write udf shared library failed"); msgInfo->code = TSDB_CODE_FILE_CORRUPTED; + goto _return; } taosCloseFile(&file); strncpy(udf->path, path, strlen(path)); @@ -686,7 +695,6 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { buf->len = 0; } } - fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal); } bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index ea78735d5e3da99d3bece2f8a62cf611b3aa9aff..ddfcd3a249afe1767cd50bdb433ce2f712354af8 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -122,7 +122,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks * pBlocks->sversion = dataBuf->pTableMeta->sversion; pBlocks->schemaLen = dataBuf->createTbReqLen; - if (pBlocks->numOfRows + numOfRows >= INT16_MAX) { + if (pBlocks->numOfRows + numOfRows >= INT32_MAX) { return TSDB_CODE_TSC_INVALID_OPERATION; } else { pBlocks->numOfRows += numOfRows; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 73a4a3472af77f1293950f0f9a6a2670eb79cd5d..85f73f0663aff9990d8298a0a4789eea4a2b6502 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -292,11 +292,10 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) { int32_t schemaLen = blk->schemaLen; blk->uid = htobe64(blk->uid); blk->suid = htobe64(blk->suid); - blk->padding = htonl(blk->padding); blk->sversion = htonl(blk->sversion); blk->dataLen = htonl(blk->dataLen); blk->schemaLen = htonl(blk->schemaLen); - blk->numOfRows = htons(blk->numOfRows); + blk->numOfRows = htonl(blk->numOfRows); blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen); } } @@ -1267,7 +1266,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData); if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) { - return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767"); + return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX"); } dataBuf->numOfTables = 1; @@ -1339,7 +1338,7 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData); if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) { - return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767"); + return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX"); } dataBuf->numOfTables = 1; @@ -1986,7 +1985,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { - return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); + return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX"); } return TSDB_CODE_SUCCESS; @@ -2074,7 +2073,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { - return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); + return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX"); } } @@ -2444,7 +2443,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) { - return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); + return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX"); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f53f1f27b70bd1acf7a5902cd3fdc683c5327051..d96cdca8a9405e891dca09431d6dac6388fdc25d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5845,11 +5845,16 @@ static int32_t createTagValFromExpr(STranslateContext* pCxt, SDataType targetDt, } static int32_t createTagValFromVal(STranslateContext* pCxt, SDataType targetDt, SNode* pNode, SValueNode** pVal) { - *pVal = (SValueNode*)nodesCloneNode(pNode); - if (NULL == *pVal) { + SValueNode* pTempVal = (SValueNode*)nodesCloneNode(pNode); + if (NULL == pTempVal) { return TSDB_CODE_OUT_OF_MEMORY; } - return DEAL_RES_ERROR == translateValueImpl(pCxt, *pVal, targetDt, true) ? pCxt->errCode : TSDB_CODE_SUCCESS; + if (DEAL_RES_ERROR == translateValueImpl(pCxt, pTempVal, targetDt, true)) { + nodesDestroyNode((SNode*)pTempVal); + return pCxt->errCode; + } + *pVal = pTempVal; + return TSDB_CODE_SUCCESS; } static int32_t createTagVal(STranslateContext* pCxt, uint8_t precision, SSchema* pSchema, SNode* pNode, diff --git a/source/libs/parser/test/parInsertTest.cpp b/source/libs/parser/test/parInsertTest.cpp index 22a1be25794e4c0ee19fd136cfc17586aea66c3c..f3f87e945aea3c93158d1e57ff9fb48ad7d64bf8 100644 --- a/source/libs/parser/test/parInsertTest.cpp +++ b/source/libs/parser/test/parInsertTest.cpp @@ -110,15 +110,15 @@ class InsertTest : public Test { SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); for (int32_t i = 0; i < numOfBlocks; ++i) { cout << "Block:" << i << endl; - cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", padding:" << ntohl(blk->padding) - << ", sversion:" << ntohl(blk->sversion) << ", dataLen:" << ntohl(blk->dataLen) - << ", schemaLen:" << ntohl(blk->schemaLen) << ", numOfRows:" << ntohs(blk->numOfRows) << endl; + cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion) + << ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen) + << ", numOfRows:" << ntohl(blk->numOfRows) << endl; blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen)); } } } - void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) { + void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) { SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_); ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV); ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT); @@ -134,7 +134,7 @@ class InsertTest : public Test { int32_t numOfBlocks = ntohl(submit->numOfBlocks); SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); for (int32_t i = 0; i < numOfBlocks; ++i) { - ASSERT_EQ(ntohs(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1))); + ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1))); blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen)); } } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 0f63510b12f26f4ff867599afb274a6cea276883..a7bc4df281719f7ee50d672518829d18a9abf65c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -573,7 +573,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // fsync once SSyncLogStoreData* pData = ths->pLogStore->data; SWal* pWal = pData->pWal; - walFsync(pWal, true); + walFsync(pWal, false); // update match index matchIndex = pMsg->prevLogIndex + pMsg->dataCount; @@ -694,7 +694,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // fsync once SSyncLogStoreData* pData = ths->pLogStore->data; SWal* pWal = pData->pWal; - walFsync(pWal, true); + walFsync(pWal, false); } // prepare response msg diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 0e17c1db80b11d54555c700f7d419a12e4778c20..bf440f04a06f7c26dfcae36a110a123c39c5bf49 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -206,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr SWal* pWal = pData->pWal; SyncIndex index = 0; - SWalSyncInfo syncMeta; + SWalSyncInfo syncMeta = {0}; syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; @@ -444,7 +444,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SWal* pWal = pData->pWal; SyncIndex index = 0; - SWalSyncInfo syncMeta; + SWalSyncInfo syncMeta = {0}; syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index f02c013d31706b7fdb0a2add11418bb241201657..12d6797349b55b6e75d89dc74bbac5939f714dc1 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -132,8 +132,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); if (preLogTerm == SYNC_TERM_INVALID) { - // SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; - SyncIndex newNextIndex = nextIndex + 1; + SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; + // SyncIndex newNextIndex = nextIndex + 1; + syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 @@ -224,8 +225,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); if (preLogTerm == SYNC_TERM_INVALID) { - // SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; - SyncIndex newNextIndex = nextIndex + 1; + SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; + // SyncIndex newNextIndex = nextIndex + 1; + syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 6ae3d8a0c0d655ae6be8bf1a23b36309962b7a65..8f6800c7bed8d1d987ae10aec8a67023792bf9dc 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -105,6 +105,10 @@ void osCleanup() {} bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; } +bool osDataSpaceAvailable() { return tsDataSpace.reserved <= tsDataSpace.size.avail; } + +bool osTempSpaceAvailable() { return tsTempSpace.reserved <= tsTempSpace.size.avail; } + void osSetTimezone(const char *timezone) { taosSetSystemTimezone(timezone, tsTimezoneStr, &tsDaylight, &tsTimezone); } void osSetSystemLocale(const char *inLocale, const char *inCharSet) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 73d89523ce2cc450e58d4de4ae69c0e5a2787493..2e8239c68f0861486d2d6175d698dc76ed92b128 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -691,9 +691,14 @@ static void taosWriteLog(SLogBuff *pLogBuf) { static void *taosAsyncOutputLog(void *param) { SLogBuff *pLogBuf = (SLogBuff *)param; setThreadName("log"); - + int32_t count = 0; while (1) { + count += tsWriteInterval; taosMsleep(tsWriteInterval); + if (count > 1000) { + osUpdate(); + count = 0; + } // Polling the buffer taosWriteLog(pLogBuf); diff --git a/tests/script/tsim/sync/start3replica.sim b/tests/script/tsim/sync/start3replica.sim new file mode 100644 index 0000000000000000000000000000000000000000..f66021f88a897bc0b77dec857c8b8febc23145c9 --- /dev/null +++ b/tests/script/tsim/sync/start3replica.sim @@ -0,0 +1,19 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +