diff --git a/cmake/cmake.version b/cmake/cmake.version index 05094f10cccea02ecb6a9cc4b9283335cb4424b6..1a83126cd7d1ce3257cc6d3c811bd4e2f49a0371 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.1.4") + SET(TD_VER_NUMBER "3.0.1.5") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index 808a21474ebc843b2e6b6081507b64c510f79797..14096bd40011ef514b522fe29060c85a69ab0670 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -6,6 +6,10 @@ description: TDengine release history, Release Notes and download links. import Release from "/components/ReleaseV3"; +## 3.0.1.5 + + + ## 3.0.1.4 diff --git a/docs/en/28-releases/02-tools.md b/docs/en/28-releases/02-tools.md index 5d6693ae73dbe9348c6dcfad6f88ed1602acd957..a7446be4e39489e02e333f03c2c4f6f47503617a 100644 --- a/docs/en/28-releases/02-tools.md +++ b/docs/en/28-releases/02-tools.md @@ -6,6 +6,10 @@ description: taosTools release history, Release Notes, download links. import Release from "/components/ReleaseV3"; +## 2.2.6 + + + ## 2.2.4 diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 380dca7dcbbb698203f507b717aa4736af736b8b..1f5a089aaa2a051e238eedc0315c37cad643b33f 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -420,7 +420,18 @@ let mut consumer = tmq.build()?; -Python 使用以下配置项创建一个 Consumer 实例。 +Python 语言下引入 `taos` 库的 `TaosConsumer` 类,创建一个 Consumer 示例: + +```python +from taos.tmq import TaosConsumer + +# Syntax: `consumer = TaosConsumer(*topics, **args)` +# +# Example: +consumer = TaosConsumer('topic1', 'topic2', td_connect_ip = "127.0.0.1", group_id = "local") +``` + +其中,元组类型参数被视为 *Topics*,字典类型参数用于以下订阅配置设置: | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- | @@ -430,12 +441,12 @@ Python 使用以下配置项创建一个 Consumer 实例。 | `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | | `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | | `client_id` | string | 客户端 ID | 最大长度:192。 | -| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | -| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 | -| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | | +| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` | +| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`,默认为 true | +| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms | | `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | -| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | -| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | +| `experimental_snapshot_enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` | +| `msg_with_table_name` | string | 是否允许从消息中解析表名,不适用于列订阅 | 合法值:`true`, `false` | | `timeout` | int | 消费者拉取数据的超时时间 | | diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index 59e0d0f76174d567eeb5b597256a78ff098a351e..a5b35342508d2e9f234bdf9fb9daae48ee3e90c7 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -6,6 +6,10 @@ description: TDengine 发布历史、Release Notes 及下载链接 import Release from "/components/ReleaseV3"; +## 3.0.1.5 + + + ## 3.0.1.4 diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md index 83ccdec3873bd6df23beaddd3408b3a3ede6559e..e86481435c1a9cc8c7c99fdae11464fcb4d87caf 100644 --- a/docs/zh/28-releases/02-tools.md +++ b/docs/zh/28-releases/02-tools.md @@ -6,6 +6,10 @@ description: taosTools 的发布历史、Release Notes 和下载链接 import Release from "/components/ReleaseV3"; +## 2.2.6 + + + ## 2.2.4 diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2d196be44cd6bd626541551a322dc962370a2c40..4a2e14eb78a6d79aff12279c4170167ea6e80b0b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -45,14 +45,6 @@ extern bool tsPrintAuth; extern int64_t tsTickPerMin[3]; extern int32_t tsCountAlwaysReturnValue; -// multi-process -extern int32_t tsMultiProcess; -extern int32_t tsMnodeShmSize; -extern int32_t tsVnodeShmSize; -extern int32_t tsQnodeShmSize; -extern int32_t tsSnodeShmSize; -extern int32_t tsNumOfShmThreads; - // queue & threads extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfCommitThreads; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e25fbe6201fcd6c51bff78f3e098a3fb9d0b60b4..42bdc3af16761b1d6a06cd7fc4bdadc05b5546fa 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -343,6 +343,8 @@ typedef struct { } SSchemaWrapper; static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { + if (pSchemaWrapper->pSchema == NULL) return NULL; + SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); if (pSW == NULL) return pSW; pSW->nCols = pSchemaWrapper->nCols; @@ -352,6 +354,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p taosMemoryFree(pSW); return NULL; } + memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); return pSW; } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1a8865bf83028a6d7dda87702d25de18e662e653..7b4e930485ccee575c0ad7e1b84e080610cfc169 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -239,13 +239,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) - TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MON_VM_INFO, "monitor-vinfo", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MON_QM_INFO, "monitor-qinfo", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MON_SM_INFO, "monitor-sinfo", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SYNC_MSG) diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index fe1692c50fc181c969b3dcea0dd751868666468d..82823e3f5762cf96479f740554e1520801969351 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -25,10 +25,9 @@ extern "C" { /** * @brief Initialize the dnode * - * @param rtype for internal debug usage, default is 0 * @return int32_t 0 for success and -1 for failure */ -int32_t dmInit(int8_t rtype); +int32_t dmInit(); /** * @brief Cleanup the dnode diff --git a/include/os/os.h b/include/os/os.h index 28a3ebee81228380c35f5d400235641ae6df2b6f..e780611c41235b159c119780ee60cd0a0eb5593f 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -94,10 +94,8 @@ extern "C" { #include "osLz4.h" #include "osMath.h" #include "osMemory.h" -#include "osProc.h" #include "osRand.h" #include "osSemaphore.h" -#include "osShm.h" #include "osSignal.h" #include "osSleep.h" #include "osSocket.h" diff --git a/include/os/osEnv.h b/include/os/osEnv.h index d4e94d6173a813e44e68355bb6a0ead964823ba1..c1fdc9e404c35dba510dafb76e2130ecbcc6ae05 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -60,6 +60,7 @@ bool osTempSpaceSufficient(); void osSetTimezone(const char *timezone); void osSetSystemLocale(const char *inLocale, const char *inCharSet); +void osSetProcPath(int32_t argc, char **argv); #ifdef __cplusplus } diff --git a/include/os/osProc.h b/include/os/osProc.h deleted file mode 100644 index f09b695ef4e55f0ab9bb9b0113cc1befff58daa0..0000000000000000000000000000000000000000 --- a/include/os/osProc.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_OS_PROC_H_ -#define _TD_OS_PROC_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -int32_t taosNewProc(char **args); -void taosWaitProc(int32_t pid); -void taosKillProc(int32_t pid); -bool taosProcExist(int32_t pid); -void taosSetProcName(int32_t argc, char **argv, const char *name); -void taosSetProcPath(int32_t argc, char **argv); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_OS_PROC_H_*/ diff --git a/include/os/osShm.h b/include/os/osShm.h deleted file mode 100644 index 87efb1ab16e8978294b2a286d8c1093d406c82a8..0000000000000000000000000000000000000000 --- a/include/os/osShm.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_OS_SHM_H_ -#define _TD_OS_SHM_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct { - int32_t id; - int32_t size; - void *ptr; -} SShm; - -int32_t taosCreateShm(SShm *pShm, int32_t key, int32_t shmsize); -void taosDropShm(SShm *pShm); -int32_t taosAttachShm(SShm *pShm); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_OS_SHM_H_*/ diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2b64d3f1eed3fdf0d9a378881bdd9a6bc0e8a8ee..3edeeee49a166c250a9cb4d2c280d51b34fd0347 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -40,14 +40,6 @@ int32_t tsMaxShellConns = 50000; int32_t tsShellActivityTimer = 3; // second bool tsPrintAuth = false; -// multi process -int32_t tsMultiProcess = 0; -int32_t tsMnodeShmSize = TSDB_MAX_MSG_SIZE * 2 + 1024; -int32_t tsVnodeShmSize = TSDB_MAX_MSG_SIZE * 10 + 1024; -int32_t tsQnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024; -int32_t tsSnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024; -int32_t tsNumOfShmThreads = 1; - // queue & threads int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfCommitThreads = 2; @@ -352,13 +344,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "numOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1; - tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4); if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; @@ -698,14 +683,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; -#if !defined(WINDOWS) && !defined(DARWIN) - tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval; -#endif - tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32; - tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32; - tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32; - tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32; - tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; @@ -903,12 +880,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { } break; } - case 'n': { - if (strcasecmp("mnodeShmSize", name) == 0) { - tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32; - } - break; - } case 'o': { if (strcasecmp("monitor", name) == 0) { tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; @@ -932,11 +903,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { break; } case 'u': { - if (strcasecmp("multiProcess", name) == 0) { -#if !defined(WINDOWS) && !defined(DARWIN) - tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval; -#endif - } else if (strcasecmp("udfDebugFlag", name) == 0) { + if (strcasecmp("udfDebugFlag", name) == 0) { udfDebugFlag = cfgGetItem(pCfg, "udfDebugFlag")->i32; } break; @@ -999,8 +966,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } - } else if (strcasecmp("qnodeShmSize", name) == 0) { - tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32; } else if (strcasecmp("qDebugFlag", name) == 0) { qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32; } else if (strcasecmp("queryPlannerTrace", name) == 0) { @@ -1041,8 +1006,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32; } else if (strcasecmp("statusInterval", name) == 0) { tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32; - } else if (strcasecmp("snodeShmSize", name) == 0) { - tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32; } else if (strcasecmp("serverPort", name) == 0) { tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; @@ -1110,9 +1073,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { break; } case 'v': { - if (strcasecmp("vnodeShmSize", name) == 0) { - tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32; - } else if (strcasecmp("vDebugFlag", name) == 0) { + if (strcasecmp("vDebugFlag", name) == 0) { vDebugFlag = cfgGetItem(pCfg, "vDebugFlag")->i32; } break; @@ -1235,6 +1196,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile if (cfgLoadFromArray(tsCfg, pArgs) != 0) { uError("failed to load cfg from array since %s", terrstr()); cfgCleanup(tsCfg); + tsCfg = NULL; return -1; } diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index acc3fc56581420a1a2d253dd1db40fc760a2c7c9..188677656a512be254815709532611013b805745 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -25,7 +25,6 @@ #define DM_SDB_INFO "Dump sdb info." #define DM_ENV_CMD "The env cmd variable string to use when configuring the server, such as: -e 'TAOS_FQDN=td1'." #define DM_ENV_FILE "The env variable file path to use when configuring the server, default is './.env', .env text can be 'TAOS_FQDN=td1'." -#define DM_NODE_TYPE "Startup type of the node, default is 0." #define DM_MACHINE_CODE "Get machine code." #define DM_VERSION "Print program version." #define DM_EMAIL "" @@ -44,7 +43,6 @@ static struct { char apolloUrl[PATH_MAX]; const char **envCmd; SArray *pArgs; // SConfigPair - EDndNodeType ntype; } global = {0}; static void dmStopDnode(int signum, void *info, void *ctx) { dmStop(); } @@ -59,13 +57,6 @@ static void dmSetSignalHandle() { taosSetSignal(SIGTSTP, dmStopDnode); taosSetSignal(SIGQUIT, dmStopDnode); #endif - - if (!tsMultiProcess) { - } else if (global.ntype == DNODE || global.ntype == NODE_END) { - taosIgnSignal(SIGCHLD); - } else { - taosKillChildOnParentStopped(); - } } static int32_t dmParseArgs(int32_t argc, char const *argv[]) { @@ -91,12 +82,6 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) { global.dumpSdb = true; } else if (strcmp(argv[i], "-E") == 0) { tstrncpy(global.envFile, argv[++i], PATH_MAX); - } else if (strcmp(argv[i], "-n") == 0) { - global.ntype = atoi(argv[++i]); - if (global.ntype <= DNODE || global.ntype > NODE_END) { - printf("'-n' range is [1 - %d], default is 0\n", NODE_END - 1); - return -1; - } } else if (strcmp(argv[i], "-k") == 0) { global.generateGrant = true; } else if (strcmp(argv[i], "-C") == 0) { @@ -142,7 +127,6 @@ static void dmPrintHelp() { printf("%s%s%s%s\n", indent, "-C,", indent, DM_DMP_CFG); printf("%s%s%s%s\n", indent, "-e,", indent, DM_ENV_CMD); printf("%s%s%s%s\n", indent, "-E,", indent, DM_ENV_FILE); - printf("%s%s%s%s\n", indent, "-n,", indent, DM_NODE_TYPE); printf("%s%s%s%s\n", indent, "-k,", indent, DM_MACHINE_CODE); printf("%s%s%s%s\n", indent, "-V,", indent, DM_VERSION); @@ -155,17 +139,7 @@ static void dmDumpCfg() { } static int32_t dmInitLog() { - char logName[12] = {0}; - snprintf(logName, sizeof(logName), "%slog", dmNodeLogName(global.ntype)); - return taosCreateLog(logName, 1, configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0); -} - -static void dmSetProcInfo(int32_t argc, char **argv) { - taosSetProcPath(argc, argv); - if (global.ntype != DNODE && global.ntype != NODE_END) { - const char *name = dmNodeProcName(global.ntype); - taosSetProcName(argc, argv, name); - } + return taosCreateLog("taosdlog", 1, configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0); } static void taosCleanupArgs() { @@ -234,6 +208,7 @@ int mainWindows(int argc, char **argv) { taosCleanupCfg(); taosCloseLog(); taosCleanupArgs(); + taosConvDestroy(); return 0; } @@ -242,13 +217,14 @@ int mainWindows(int argc, char **argv) { taosCleanupCfg(); taosCloseLog(); taosCleanupArgs(); + taosConvDestroy(); return 0; } - dmSetProcInfo(argc, (char **)argv); + osSetProcPath(argc, (char **)argv); taosCleanupArgs(); - if (dmInit(global.ntype) != 0) { + if (dmInit() != 0) { dError("failed to init dnode since %s", terrstr()); return -1; } diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 17cbde437b2e94d8d111c76600ddef8fb7767818..6d06535447d812b0ae83589bc2d07e48feaf4687 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -34,7 +34,6 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; - SSingleWorker monitorWorker; bool stopped; int32_t refCount; TdThreadRwlock lock; @@ -48,8 +47,6 @@ int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption); SArray *mmGetMsgHandles(); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); @@ -59,7 +56,6 @@ int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); #ifdef __cplusplus diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 105b6e542eb38bbcf4d5e06dc6132ffb2ccc3971..7fb700e776cd422fc447e980da35200848065c22 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -20,58 +20,11 @@ void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) { mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant); } -int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SMonMmInfo mmInfo = {0}; - mmGetMonitorInfo(pMgmt, &mmInfo); - dmGetMonitorSystemInfo(&mmInfo.sys); - monGetLogs(&mmInfo.log); - - int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); - pMsg->info.rsp = pRsp; - pMsg->info.rspLen = rspLen; - tFreeSMonMmInfo(&mmInfo); - return 0; -} - void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { pInfo->isMnode = 1; mndGetLoad(pMgmt->pMnode, &pInfo->load); } -int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SMonMloadInfo mloads = {0}; - mmGetMnodeLoads(pMgmt, &mloads); - - int32_t rspLen = tSerializeSMonMloadInfo(NULL, 0, &mloads); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonMloadInfo(pRsp, rspLen, &mloads); - pMsg->info.rsp = pRsp; - pMsg->info.rspLen = rspLen; - return 0; -} - int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; SDCreateMnodeReq createReq = {0}; @@ -230,9 +183,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 849d3ef39019fcb5ce9dfb987cac4772e91839a7..16e6f67409ddea913b422779bda28e1063adec48 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -46,22 +46,12 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; - int32_t code = -1; + pMsg->info.node = pMgmt->pMnode; const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, get from mnode queue", pMsg); - switch (pMsg->msgType) { - case TDMT_MON_MM_INFO: - code = mmProcessGetMonitorInfoReq(pMgmt, pMsg); - break; - case TDMT_MON_MM_LOAD: - code = mmProcessGetLoadsReq(pMgmt, pMsg); - break; - default: - pMsg->info.node = pMgmt->pMnode; - code = mndProcessRpcMsg(pMsg); - } + int32_t code = mndProcessRpcMsg(pMsg); if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; @@ -136,10 +126,6 @@ int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg); } -int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg); -} - int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SSingleWorker *pWorker = NULL; switch (qtype) { @@ -237,18 +223,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } - SSingleWorkerCfg mCfg = { - .min = 1, - .max = 1, - .name = "mnode-monitor", - .fp = (FItem)mmProcessRpcMsg, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start mnode mnode-monitor worker since %s", terrstr()); - return -1; - } - dDebug("mnode workers are initialized"); return 0; } @@ -256,7 +230,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) { while (pMgmt->refCount > 0) taosMsleep(10); - tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->fetchWorker); tSingleWorkerCleanup(&pMgmt->readWorker); diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index 56bf56f25e85a57140566bb20b9b252dae5b521a..c6c239bdcc8b3fa81830c5bed96f237767aabbb2 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -32,14 +32,12 @@ typedef struct SQnodeMgmt { const char *name; SSingleWorker queryWorker; SSingleWorker fetchWorker; - SSingleWorker monitorWorker; } SQnodeMgmt; // qmHandle.c SArray *qmGetMsgHandles(); int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); // qmWorker.c int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pMsg); @@ -49,7 +47,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 0573be90ea84cd2173d5492e5aaa2d6f6cf076b5..5017ad7b741a5854e02433061f70b1967a038e62 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -29,31 +29,6 @@ void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) { pInfo->dnodeId = pMgmt->pData->dnodeId; } -int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SMonQmInfo qmInfo = {0}; - qmGetMonitorInfo(pMgmt, &qmInfo); - dmGetMonitorSystemInfo(&qmInfo.sys); - monGetLogs(&qmInfo.log); - - int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); - pMsg->info.rsp = pRsp; - pMsg->info.rspLen = rspLen; - tFreeSMonQmInfo(&qmInfo); - return 0; -} - int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { SDCreateQnodeReq createReq = {0}; if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { @@ -103,8 +78,6 @@ SArray *qmGetMsgHandles() { SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_QM_INFO, qmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - // Requests handled by VNODE if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 66386b0ee0bc7fac510a0e9ceb1d1bab4ac649be..edbe9882a43232d48eee63966cc6ea3a3f61536e 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -28,18 +28,9 @@ static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) { static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; - int32_t code = -1; dTrace("msg:%p, get from qnode queue", pMsg); - switch (pMsg->msgType) { - case TDMT_MON_QM_INFO: - code = qmProcessGetMonitorInfoReq(pMgmt, pMsg); - break; - default: - code = qndProcessQueryMsg(pMgmt->pQnode, pInfo->timestamp, pMsg); - break; - } - + int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pInfo->timestamp, pMsg); if (IsReq(pMsg) && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; qmSendRsp(pMsg, code); @@ -66,10 +57,6 @@ int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { return qmPutNodeMsgToWorker(&pMgmt->fetchWorker, pMsg); } -int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return qmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); -} - int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) return -1; @@ -136,24 +123,11 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { return -1; } - SSingleWorkerCfg mCfg = { - .min = 1, - .max = 1, - .name = "qnode-monitor", - .fp = (FItem)qmProcessQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start qnode-monitor worker since %s", terrstr()); - return -1; - } - dDebug("qnode workers are initialized"); return 0; } void qmStopWorker(SQnodeMgmt *pMgmt) { - tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->fetchWorker); dDebug("qnode workers are closed"); diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index fbf63dda430cc6ead7fa9ea43e3ab010f265d512..4efe1c997b578a3397f1ab0fbcc39b612af123a8 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -33,14 +33,12 @@ typedef struct SSnodeMgmt { int8_t uniqueWorkerInUse; SArray *uniqueWorkers; // SArray SSingleWorker sharedWorker; - SSingleWorker monitorWorker; } SSnodeMgmt; // smHandle.c SArray *smGetMsgHandles(); int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); @@ -49,7 +47,6 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index c6e3b3611f5e264f14a7da7261e2f1b89ff58c28..65c96767ab33edf6f1ddcb04aa76d276dc17e92f 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -18,31 +18,6 @@ void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} -int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SMonSmInfo smInfo = {0}; - smGetMonitorInfo(pMgmt, &smInfo); - dmGetMonitorSystemInfo(&smInfo.sys); - monGetLogs(&smInfo.log); - - int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); - pMsg->info.rsp = pRsp; - pMsg->info.rspLen = rspLen; - tFreeSMonSmInfo(&smInfo); - return 0; -} - int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { SDCreateSnodeReq createReq = {0}; if (tDeserializeSCreateDropMQSNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { @@ -92,8 +67,6 @@ SArray *smGetMsgHandles() { SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 19c1b9b5c7afd9d1726b364ee69d94619f471d2d..ad56d57f69472f5101c7149118fbb95c4a05b5f2 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -26,27 +26,6 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } -static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SSnodeMgmt *pMgmt = pInfo->ahandle; - int32_t code = -1; - dTrace("msg:%p, get from snode-monitor queue", pMsg); - - if (pMsg->msgType == TDMT_MON_SM_INFO) { - code = smProcessGetMonitorInfoReq(pMgmt, pMsg); - } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - } - - if (IsReq(pMsg)) { - if (code != 0 && terrno != 0) code = terrno; - smSendRsp(pMsg, code); - } - - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SSnodeMgmt *pMgmt = pInfo->ahandle; @@ -123,24 +102,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } - SSingleWorkerCfg mCfg = { - .min = 1, - .max = 1, - .name = "snode-monitor", - .fp = (FItem)smProcessMonitorQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start snode-monitor worker since %s", terrstr()); - return -1; - } - dDebug("snode workers are initialized"); return 0; } void smStopWorker(SSnodeMgmt *pMgmt) { - tSingleWorkerCleanup(&pMgmt->monitorWorker); for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); tMultiWorkerCleanup(pWorker); @@ -175,14 +141,6 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SSingleWorker *pWorker = &pMgmt->monitorWorker; - - dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - taosWriteQitem(pWorker->queue, pMsg); - return 0; -} - int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t index = smGetSWIdFromMsg(pMsg); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 32649464c1561011695510ed1094bb0b5cf96091..1bd68e6d419dab5e7f61f35209de6c3175f7a7f4 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -38,7 +38,6 @@ typedef struct SVnodeMgmt { SWWorkerPool writePool; SWWorkerPool applyPool; SSingleWorker mgmtWorker; - SSingleWorker monitorWorker; SHashObj *hash; TdThreadRwlock lock; SVnodesStat state; @@ -89,8 +88,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); SArray *vmGetMsgHandles(); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); // vmFile.c int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); @@ -114,7 +111,6 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index fe9f114084d17f7edd82e5bda558703c9e5a8ceb..c92b6c4c4d09b8780d8b8c798762bb9600fa306a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -83,54 +83,6 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { taosArrayDestroy(pVloads); } -int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SMonVmInfo vmInfo = {0}; - vmGetMonitorInfo(pMgmt, &vmInfo); - dmGetMonitorSystemInfo(&vmInfo.sys); - monGetLogs(&vmInfo.log); - - int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); - pMsg->info.rsp = pRsp; - pMsg->info.rspLen = rspLen; - tFreeSMonVmInfo(&vmInfo); - return 0; -} - -int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SMonVloadInfo vloads = {0}; - vmGetVnodeLoads(pMgmt, &vloads, false); - - int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); - pMsg->info.rsp = pRsp; - pMsg->info.rspLen = rspLen; - tFreeSMonVloadInfo(&vloads); - return 0; -} - static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg)); @@ -348,9 +300,6 @@ SArray *vmGetMsgHandles() { SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e08f5963cdb877f21252e08b180d4c5c47fa39ab..fd1dbe00cef612a1264454043a6972a8fe8f183a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -34,12 +34,6 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dGTrace("msg:%p, get from vnode-mgmt queue", pMsg); switch (pMsg->msgType) { - case TDMT_MON_VM_INFO: - code = vmProcessGetMonitorInfoReq(pMgmt, pMsg); - break; - case TDMT_MON_VM_LOAD: - code = vmProcessGetLoadsReq(pMgmt, pMsg); - break; case TDMT_DND_CREATE_VNODE: code = vmProcessCreateVnodeReq(pMgmt, pMsg); break; @@ -258,13 +252,6 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - const STraceId *trace = &pMsg->info.traceId; - dGTrace("msg:%p, put into vnode-monitor queue", pMsg); - taosWriteQitem(pMgmt->monitorWorker.queue, pMsg); - return 0; -} - int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) { @@ -410,21 +397,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { }; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1; - SSingleWorkerCfg monitorCfg = { - .min = 1, - .max = 1, - .name = "vnode-monitor", - .fp = (FItem)vmProcessMgmtQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &monitorCfg) != 0) return -1; - dDebug("vnode workers are initialized"); return 0; } void vmStopWorker(SVnodeMgmt *pMgmt) { - tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker); tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->applyPool); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index adde0557965fb7651c66a8b4791d4a671db91201..7e85e6b72239260d13aba44306309cd2528c3364 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -25,44 +25,6 @@ extern "C" { #endif -typedef struct SMgmtWrapper SMgmtWrapper; - -#define SINGLE_PROC 0 -#define CHILD_PROC 1 -#define PARENT_PROC 2 -#define TEST_PROC 3 -#define OnlyInSingleProc(wrapper) ((wrapper)->proc.ptype == SINGLE_PROC) -#define OnlyInChildProc(wrapper) ((wrapper)->proc.ptype == CHILD_PROC) -#define OnlyInParentProc(wrapper) ((wrapper)->proc.ptype == PARENT_PROC) -#define InChildProc(wrapper) ((wrapper)->proc.ptype & CHILD_PROC) -#define InParentProc(wrapper) ((wrapper)->proc.ptype & PARENT_PROC) - -typedef struct { - int32_t head; - int32_t tail; - int32_t total; - int32_t avail; - int32_t items; - char name[8]; - TdThreadMutex mutex; - tsem_t sem; - char pBuffer[]; -} SProcQueue; - -typedef struct { - SMgmtWrapper *wrapper; - const char *name; - SHashObj *hash; - SProcQueue *pqueue; - SProcQueue *cqueue; - TdThread pthread; - TdThread cthread; - SShm shm; - int32_t pid; - EDndProcType ptype; - bool stop; -} SProc; - typedef struct SMgmtWrapper { SMgmtFunc func; struct SDnode *pDnode; @@ -74,7 +36,6 @@ typedef struct SMgmtWrapper { EDndNodeType ntype; bool deployed; bool required; - SProc proc; NodeMsgFp msgFps[TDMT_MAX]; } SMgmtWrapper; @@ -111,8 +72,6 @@ typedef struct SUdfdData { typedef struct SDnode { int8_t once; bool stop; - EDndProcType ptype; - EDndNodeType rtype; EDndRunStatus status; SStartupInfo startup; SDnodeTrans trans; @@ -128,7 +87,7 @@ SDnode *dmInstance(); void dmReportStartup(const char *pName, const char *pDesc); // dmMgmt.c -int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype); +int32_t dmInitDnode(SDnode *pDnode); void dmCleanupDnode(SDnode *pDnode); SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType); int32_t dmMarkWrapper(SMgmtWrapper *pWrapper); @@ -145,16 +104,6 @@ void dmStopNode(SMgmtWrapper *pWrapper); void dmCloseNode(SMgmtWrapper *pWrapper); int32_t dmRunDnode(SDnode *pDnode); -// dmProc.c -int32_t dmInitProc(struct SMgmtWrapper *pWrapper); -void dmCleanupProc(struct SMgmtWrapper *pWrapper); -int32_t dmRunProc(SProc *proc); -void dmStopProc(SProc *proc); -void dmRemoveProcRpcHandle(SProc *proc, void *handle); -void dmCloseProcRpcHandles(SProc *proc); -int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype); -void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype); - // dmTransport.c int32_t dmInitServer(SDnode *pDnode); void dmCleanupServer(SDnode *pDnode); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 5a7f149bc6e22f544ea5414be07f0829c264bb5f..a222ad3f7da738c8f2f9a84ac4f9463ae309b5d5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -53,13 +53,19 @@ static bool dmCheckDiskSpace() { osUpdate(); // sufficiency if (!osDataSpaceSufficient()) { - dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0); + dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least", + (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, + (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0); } if (!osLogSpaceSufficient()) { - dWarn("free log disk size: %f GB, not sufficient, expected %f GB at least", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0); + dWarn("free log disk size: %f GB, not sufficient, expected %f GB at least", + (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, + (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0); } if (!osTempSpaceSufficient()) { - dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0); + dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least", + (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, + (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0); } // availability bool ret = true; @@ -82,7 +88,7 @@ static bool dmCheckDiskSpace() { } static bool dmCheckDataDirVersion() { - char checkDataDirJsonFileName[PATH_MAX]; + char checkDataDirJsonFileName[PATH_MAX] = {0}; snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir); if (taosCheckExistFile(checkDataDirJsonFileName)) { dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!", @@ -92,14 +98,14 @@ static bool dmCheckDataDirVersion() { return true; } -int32_t dmInit(int8_t rtype) { +int32_t dmInit() { dInfo("start to init dnode env"); if (!dmCheckDataDirVersion()) return -1; if (!dmCheckDiskSpace()) return -1; if (dmCheckRepeatInit(dmInstance()) != 0) return -1; if (dmInitSystem() != 0) return -1; if (dmInitMonitor() != 0) return -1; - if (dmInitDnode(dmInstance(), rtype) != 0) return -1; + if (dmInitDnode(dmInstance()) != 0) return -1; dInfo("dnode env is initialized"); return 0; @@ -126,8 +132,8 @@ void dmCleanup() { taosStopCacheRefreshWorker(); dInfo("dnode env is cleaned up"); - taosCloseLog(); taosCleanupCfg(); + taosCloseLog(); } void dmStop() { @@ -174,7 +180,6 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { } pWrapper->deployed = true; pWrapper->required = true; - pWrapper->proc.ptype = pDnode->ptype; } taosThreadMutexUnlock(&pDnode->mutex); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 75c76a1f54cfbf45cee7fa9006d7588fe5644d8e..2c9020b668550a914cdd7e235ad2c5242af36057 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -26,47 +26,14 @@ static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { int32_t code = (*pWrapper->func.requiredFp)(&input, &required); if (!required) { dDebug("node:%s, does not require startup", pWrapper->name); - } - - if (pWrapper->ntype == DNODE) { - if (pDnode->rtype != DNODE && pDnode->rtype != NODE_END) { - required = false; - dDebug("node:%s, does not require startup in child process", pWrapper->name); - } } else { - if (OnlyInChildProc(pWrapper)) { - if (pWrapper->ntype != pDnode->rtype) { - dDebug("node:%s, does not require startup in child process", pWrapper->name); - required = false; - } - } - } - - if (required) { dDebug("node:%s, required to startup", pWrapper->name); } return required; } -static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) { - pDnode->rtype = rtype; - - if (tsMultiProcess == 0) { - pDnode->ptype = DND_PROC_SINGLE; - dInfo("dnode will run in single-process mode"); - } else if (tsMultiProcess > 1) { - pDnode->ptype = DND_PROC_TEST; - dInfo("dnode will run in multi-process test mode"); - } else if (pDnode->rtype == DNODE || pDnode->rtype == NODE_END) { - pDnode->ptype = DND_PROC_PARENT; - dInfo("dnode will run in parent-process mode"); - } else { - pDnode->ptype = DND_PROC_CHILD; - SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->rtype]; - dInfo("dnode will run in child-process mode, node:%s", dmNodeName(pDnode->rtype)); - } - +static int32_t dmInitVars(SDnode *pDnode) { SDnodeData *pData = &pDnode->data; pData->dnodeId = 0; pData->clusterId = 0; @@ -127,12 +94,12 @@ static void dmClearVars(SDnode *pDnode) { memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); } -int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { +int32_t dmInitDnode(SDnode *pDnode) { dDebug("start to create dnode"); int32_t code = -1; char path[PATH_MAX + 100] = {0}; - if (dmInitVars(pDnode, rtype) != 0) { + if (dmInitVars(pDnode) != 0) { goto _OVER; } @@ -147,13 +114,6 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { pWrapper->pDnode = pDnode; pWrapper->name = dmNodeName(ntype); pWrapper->ntype = ntype; - pWrapper->proc.wrapper = pWrapper; - pWrapper->proc.shm.id = -1; - pWrapper->proc.pid = -1; - pWrapper->proc.ptype = pDnode->ptype; - if (ntype == DNODE) { - pWrapper->proc.ptype = DND_PROC_SINGLE; - } taosThreadRwlockInit(&pWrapper->lock, NULL); snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name); @@ -164,11 +124,6 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { } pWrapper->required = dmRequireNode(pDnode, pWrapper); - - if (ntype != DNODE && dmReadShmFile(pWrapper->path, pWrapper->name, pDnode->rtype, &pWrapper->proc.shm) != 0) { - dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr()); - goto _OVER; - } } if (dmInitMsgHandle(pDnode) != 0) { @@ -176,16 +131,14 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { goto _OVER; } - if (pDnode->ptype == SINGLE_PROC || (pDnode->ptype & PARENT_PROC)) { - pDnode->lockfile = dmCheckRunning(tsDataDir); - if (pDnode->lockfile == NULL) { - goto _OVER; - } + pDnode->lockfile = dmCheckRunning(tsDataDir); + if (pDnode->lockfile == NULL) { + goto _OVER; + } - if (dmInitServer(pDnode) != 0) { - dError("failed to init transport since %s", terrstr()); - goto _OVER; - } + if (dmInitServer(pDnode) != 0) { + dError("failed to init transport since %s", terrstr()); + goto _OVER; } if (dmInitClient(pDnode) != 0) { @@ -248,7 +201,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { int32_t code = 0; taosThreadRwlockRdlock(&pWrapper->lock); - if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) { + if (pWrapper->deployed) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); // dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount); } else { diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index be1de02e6a1f1b1c5dbaaf9a02a8370881287619..4ab56ed682c4c868d6db45dded6345e81d8c8915 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -17,18 +17,6 @@ #include "dmMgmt.h" #include "dmNodes.h" -#define dmSendLocalRecv(pDnode, mtype, func, pInfo) \ - SRpcMsg rsp = {0}; \ - SRpcMsg req = {.msgType = mtype}; \ - SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ - tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \ - epset.eps[0].port = tsServerPort; \ - rpcSendRecv(pDnode->trans.clientRpc, &epset, &req, &rsp); \ - if (rsp.code == 0 && rsp.contLen > 0) { \ - func(rsp.pCont, rsp.contLen, pInfo); \ - } \ - rpcFreeCont(rsp.pCont); - static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { pInfo->protocol = 1; pInfo->dnode_id = pDnode->data.dnodeId; @@ -59,9 +47,7 @@ static void dmGetMmMonitorInfo(SDnode *pDnode) { SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE]; if (dmMarkWrapper(pWrapper) == 0) { SMonMmInfo mmInfo = {0}; - if (tsMultiProcess) { - dmSendLocalRecv(pDnode, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo); - } else if (pWrapper->pMgmt != NULL) { + if (pWrapper->pMgmt != NULL) { mmGetMonitorInfo(pWrapper->pMgmt, &mmInfo); } dmReleaseWrapper(pWrapper); @@ -74,9 +60,7 @@ static void dmGetVmMonitorInfo(SDnode *pDnode) { SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; if (dmMarkWrapper(pWrapper) == 0) { SMonVmInfo vmInfo = {0}; - if (tsMultiProcess) { - dmSendLocalRecv(pDnode, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo); - } else if (pWrapper->pMgmt != NULL) { + if (pWrapper->pMgmt != NULL) { vmGetMonitorInfo(pWrapper->pMgmt, &vmInfo); } dmReleaseWrapper(pWrapper); @@ -89,9 +73,7 @@ static void dmGetQmMonitorInfo(SDnode *pDnode) { SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE]; if (dmMarkWrapper(pWrapper) == 0) { SMonQmInfo qmInfo = {0}; - if (tsMultiProcess) { - dmSendLocalRecv(pDnode, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo); - } else if (pWrapper->pMgmt != NULL) { + if (pWrapper->pMgmt != NULL) { qmGetMonitorInfo(pWrapper->pMgmt, &qmInfo); } dmReleaseWrapper(pWrapper); @@ -104,9 +86,7 @@ static void dmGetSmMonitorInfo(SDnode *pDnode) { SMgmtWrapper *pWrapper = &pDnode->wrappers[SNODE]; if (dmMarkWrapper(pWrapper) == 0) { SMonSmInfo smInfo = {0}; - if (tsMultiProcess) { - dmSendLocalRecv(pDnode, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo); - } else if (pWrapper->pMgmt != NULL) { + if (pWrapper->pMgmt != NULL) { smGetMonitorInfo(pWrapper->pMgmt, &smInfo); } dmReleaseWrapper(pWrapper); @@ -132,9 +112,7 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; if (dmMarkWrapper(pWrapper) == 0) { - if (tsMultiProcess) { - dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo); - } else if (pWrapper->pMgmt != NULL) { + if (pWrapper->pMgmt != NULL) { vmGetVnodeLoads(pWrapper->pMgmt, pInfo, false); } dmReleaseWrapper(pWrapper); @@ -145,9 +123,7 @@ void dmGetMnodeLoads(SMonMloadInfo *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE]; if (dmMarkWrapper(pWrapper) == 0) { - if (tsMultiProcess) { - dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo); - } else if (pWrapper->pMgmt != NULL) { + if (pWrapper->pMgmt != NULL) { mmGetMnodeLoads(pWrapper->pMgmt, pInfo); } dmReleaseWrapper(pWrapper); @@ -158,9 +134,7 @@ void dmGetQnodeLoads(SQnodeLoad *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE]; if (dmMarkWrapper(pWrapper) == 0) { - if (tsMultiProcess) { - dmSendLocalRecv(pDnode, TDMT_MON_QM_LOAD, tDeserializeSQnodeLoad, pInfo); - } else if (pWrapper->pMgmt != NULL) { + if (pWrapper->pMgmt != NULL) { qmGetQnodeLoads(pWrapper->pMgmt, pInfo); } dmReleaseWrapper(pWrapper); diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index ec5f3589e018e55df5921230d5f6936326dc8651..6893e486bb29837f599b4f2226efbc25b61fdb85 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -16,53 +16,6 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -static int32_t dmCreateShm(SMgmtWrapper *pWrapper) { - int32_t shmsize = tsMnodeShmSize; - if (pWrapper->ntype == VNODE) { - shmsize = tsVnodeShmSize; - } else if (pWrapper->ntype == QNODE) { - shmsize = tsQnodeShmSize; - } else if (pWrapper->ntype == SNODE) { - shmsize = tsSnodeShmSize; - } else if (pWrapper->ntype == MNODE) { - shmsize = tsMnodeShmSize; - } else { - return -1; - } - - if (taosCreateShm(&pWrapper->proc.shm, pWrapper->ntype, shmsize) != 0) { - terrno = TAOS_SYSTEM_ERROR(terrno); - dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr()); - return -1; - } - - dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->proc.shm.id, shmsize); - return 0; -} - -static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) { - char tstr[8] = {0}; - char *args[6] = {0}; - snprintf(tstr, sizeof(tstr), "%d", ntype); - args[1] = "-c"; - args[2] = configDir; - args[3] = "-n"; - args[4] = tstr; - args[5] = NULL; - - int32_t pid = taosNewProc(args); - if (pid <= 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr()); - return -1; - } - - taosIgnSignal(SIGCHLD); - pWrapper->proc.pid = pid; - dInfo("node:%s, continue running in new pid:%d", pWrapper->name, pid); - return 0; -} - int32_t dmOpenNode(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; @@ -75,64 +28,14 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { SMgmtOutputOpt output = {0}; SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); - if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) { - tmsgSetDefault(&input.msgCb); - } - - if (OnlyInSingleProc(pWrapper)) { - dInfo("node:%s, start to open", pWrapper->name); - if ((*pWrapper->func.openFp)(&input, &output) != 0) { - dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); - return -1; - } - dInfo("node:%s, has been opened", pWrapper->name); - pWrapper->deployed = true; - } - - if (InParentProc(pWrapper)) { - dDebug("node:%s, start to open", pWrapper->name); - if (dmCreateShm(pWrapper) != 0) { - return -1; - } - if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->proc.shm) != 0) { - return -1; - } - - if (OnlyInParentProc(pWrapper)) { - if (dmInitProc(pWrapper) != 0) { - dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr()); - return -1; - } - if (pDnode->rtype == NODE_END) { - dInfo("node:%s, should be started manually in child process", pWrapper->name); - } else { - if (dmNewProc(pWrapper, pWrapper->ntype) != 0) { - return -1; - } - } - if (dmRunProc(&pWrapper->proc) != 0) { - dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); - return -1; - } - } - dDebug("node:%s, has been opened in parent process", pWrapper->name); - } - - if (InChildProc(pWrapper)) { - dDebug("node:%s, start to open", pWrapper->name); - if ((*pWrapper->func.openFp)(&input, &output) != 0) { - dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); - return -1; - } - if (dmInitProc(pWrapper) != 0) { - return -1; - } - if (dmRunProc(&pWrapper->proc) != 0) { - return -1; - } - dDebug("node:%s, has been opened in child process", pWrapper->name); - pWrapper->deployed = true; + dInfo("node:%s, start to open", pWrapper->name); + tmsgSetDefault(&input.msgCb); + if ((*pWrapper->func.openFp)(&input, &output) != 0) { + dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); + return -1; } + dInfo("node:%s, has been opened", pWrapper->name); + pWrapper->deployed = true; if (output.pMgmt != NULL) { pWrapper->pMgmt = output.pMgmt; @@ -143,7 +46,6 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { } int32_t dmStartNode(SMgmtWrapper *pWrapper) { - if (OnlyInParentProc(pWrapper)) return 0; if (pWrapper->func.startFp != NULL) { dDebug("node:%s, start to start", pWrapper->name); if ((*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) { @@ -173,17 +75,6 @@ void dmCloseNode(SMgmtWrapper *pWrapper) { taosMsleep(10); } - if (OnlyInParentProc(pWrapper)) { - int32_t pid = pWrapper->proc.pid; - if (pid > 0 && taosProcExist(pid)) { - dInfo("node:%s, send kill signal to the child pid:%d", pWrapper->name, pid); - taosKillProc(pid); - dInfo("node:%s, wait for child pid:%d to stop", pWrapper->name, pid); - taosWaitProc(pid); - dInfo("node:%s, child pid:%d is stopped", pWrapper->name, pid); - } - } - taosThreadRwlockWrlock(&pWrapper->lock); if (pWrapper->pMgmt != NULL) { (*pWrapper->func.closeFp)(pWrapper->pMgmt); @@ -191,10 +82,6 @@ void dmCloseNode(SMgmtWrapper *pWrapper) { } taosThreadRwlockUnlock(&pWrapper->lock); - if (!OnlyInSingleProc(pWrapper)) { - dmCleanupProc(pWrapper); - } - dInfo("node:%s, has been closed", pWrapper->name); } @@ -241,29 +128,8 @@ static void dmCloseNodes(SDnode *pDnode) { } } -static void dmWatchNodes(SDnode *pDnode) { - if (pDnode->ptype != PARENT_PROC) return; - if (pDnode->rtype == NODE_END) return; - - taosThreadMutexLock(&pDnode->mutex); - for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SProc *proc = &pWrapper->proc; - - if (!pWrapper->required) continue; - if (!OnlyInParentProc(pWrapper)) continue; - - if (proc->pid <= 0 || !taosProcExist(proc->pid)) { - dError("node:%s, pid:%d is killed and needs to restart", pWrapper->name, proc->pid); - dmCloseProcRpcHandles(&pWrapper->proc); - dmNewProc(pWrapper, ntype); - } - } - taosThreadMutexUnlock(&pDnode->mutex); -} - int32_t dmRunDnode(SDnode *pDnode) { - int count = 0; + int32_t count = 0; if (dmOpenNodes(pDnode) != 0) { dError("failed to open nodes since %s", terrstr()); return -1; @@ -273,6 +139,7 @@ int32_t dmRunDnode(SDnode *pDnode) { dError("failed to start nodes since %s", terrstr()); return -1; } + while (1) { if (pDnode->stop) { dInfo("TDengine is about to stop"); @@ -282,8 +149,8 @@ int32_t dmRunDnode(SDnode *pDnode) { return 0; } - dmWatchNodes(pDnode); if (count == 0) osUpdate(); + count %= 10; count++; taosMsleep(100); diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c deleted file mode 100644 index 5cf73cf1fd6d154744c2d4ea550522b60552e881..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dmMgmt.h" - -static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; } - -static int32_t dmInitProcMutex(SProcQueue *queue) { - TdThreadMutexAttr mattr = {0}; - - if (taosThreadMutexAttrInit(&mattr) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to init mutex while init attr since %s", queue->name, terrstr()); - return -1; - } - - if (taosThreadMutexAttrSetPshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { - taosThreadMutexAttrDestroy(&mattr); - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to init mutex while set shared since %s", queue->name, terrstr()); - return -1; - } - - if (taosThreadMutexInit(&queue->mutex, &mattr) != 0) { - taosThreadMutexAttrDestroy(&mattr); - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to init mutex since %s", queue->name, terrstr()); - return -1; - } - - taosThreadMutexAttrDestroy(&mattr); - return 0; -} - -static int32_t dmInitProcSem(SProcQueue *queue) { - if (tsem_init(&queue->sem, 1, 0) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to init sem since %s", queue->name, terrstr()); - return -1; - } - - return 0; -} - -static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) { - SProcQueue *queue = (SProcQueue *)(ptr); - - int32_t bufSize = size - CEIL8(sizeof(SProcQueue)); - if (bufSize <= 1024) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - if (proc->ptype & DND_PROC_PARENT) { - memset(ptr, 0, sizeof(SProcQueue)); - if (dmInitProcMutex(queue) != 0) { - return NULL; - } - - if (dmInitProcSem(queue) != 0) { - return NULL; - } - - tstrncpy(queue->name, proc->name, sizeof(queue->name)); - - taosThreadMutexLock(&queue->mutex); - // queue->head = 0; - // queue->tail = 0; - queue->total = bufSize; - queue->avail = bufSize; - // queue->items = 0; - taosThreadMutexUnlock(&queue->mutex); - } - - return queue; -} - -static void dmCleanupProcQueue(SProcQueue *queue) {} - -static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) { - const void *pHead = pMsg; - const void *pBody = pMsg->pCont; - const int16_t rawHeadLen = sizeof(SRpcMsg); - const int32_t rawBodyLen = pMsg->contLen; - const int16_t headLen = CEIL8(rawHeadLen); - const int32_t bodyLen = CEIL8(rawBodyLen); - const int32_t fullLen = headLen + bodyLen + 8; - const int64_t handle = (int64_t)pMsg->info.handle; - - if (fullLen > queue->total) { - terrno = TSDB_CODE_OUT_OF_RANGE; - return -1; - } - - taosThreadMutexLock(&queue->mutex); - if (fullLen > queue->avail) { - taosThreadMutexUnlock(&queue->mutex); - terrno = TSDB_CODE_OUT_OF_SHM_MEM; - return -1; - } - - if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0 && pMsg->info.noResp == 0) { - if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) { - taosThreadMutexUnlock(&queue->mutex); - return -1; - } - } - - const int32_t pos = queue->tail; - if (queue->tail < queue->total) { - *(int16_t *)(queue->pBuffer + queue->tail) = rawHeadLen; - *(int8_t *)(queue->pBuffer + queue->tail + 2) = (int8_t)ftype; - *(int32_t *)(queue->pBuffer + queue->tail + 4) = rawBodyLen; - } else { - *(int16_t *)(queue->pBuffer) = rawHeadLen; - *(int8_t *)(queue->pBuffer + 2) = (int8_t)ftype; - *(int32_t *)(queue->pBuffer + 4) = rawBodyLen; - } - - if (queue->tail < queue->head) { - memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); - if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen); - queue->tail = queue->tail + 8 + headLen + bodyLen; - } else { - int32_t remain = queue->total - queue->tail; - if (remain == 0) { - memcpy(queue->pBuffer + 8, pHead, rawHeadLen); - if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen); - queue->tail = 8 + headLen + bodyLen; - } else if (remain == 8) { - memcpy(queue->pBuffer, pHead, rawHeadLen); - if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen); - queue->tail = headLen + bodyLen; - } else if (remain < 8 + headLen) { - memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8); - memcpy(queue->pBuffer, (char *)pHead + remain - 8, rawHeadLen - (remain - 8)); - if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); - queue->tail = headLen - (remain - 8) + bodyLen; - } else if (remain < 8 + headLen + bodyLen) { - memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); - if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen); - if (rawBodyLen > 0) - memcpy(queue->pBuffer, (char *)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); - queue->tail = bodyLen - (remain - 8 - headLen); - } else { - memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen); - if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen); - queue->tail = queue->tail + headLen + bodyLen + 8; - } - } - - queue->avail -= fullLen; - queue->items++; - taosThreadMutexUnlock(&queue->mutex); - tsem_post(&queue->sem); - - dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype), - pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items); - return 0; -} - -static inline int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) { - tsem_wait(&queue->sem); - - taosThreadMutexLock(&queue->mutex); - if (queue->total - queue->avail <= 0) { - taosThreadMutexUnlock(&queue->mutex); - terrno = TSDB_CODE_OUT_OF_SHM_MEM; - return 0; - } - - int16_t rawHeadLen = 0; - int8_t ftype = 0; - int32_t rawBodyLen = 0; - if (queue->head < queue->total) { - rawHeadLen = *(int16_t *)(queue->pBuffer + queue->head); - ftype = *(int8_t *)(queue->pBuffer + queue->head + 2); - rawBodyLen = *(int32_t *)(queue->pBuffer + queue->head + 4); - } else { - rawHeadLen = *(int16_t *)(queue->pBuffer); - ftype = *(int8_t *)(queue->pBuffer + 2); - rawBodyLen = *(int32_t *)(queue->pBuffer + 4); - } - int16_t headLen = CEIL8(rawHeadLen); - int32_t bodyLen = CEIL8(rawBodyLen); - - void *pHead = taosAllocateQitem(headLen, DEF_QITEM); - void *pBody = NULL; - if (bodyLen > 0) pBody = rpcMallocCont(bodyLen); - if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) { - taosThreadMutexUnlock(&queue->mutex); - tsem_post(&queue->sem); - taosFreeQitem(pHead); - rpcFreeCont(pBody); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - const int32_t pos = queue->head; - if (queue->head < queue->tail) { - memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); - if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen); - queue->head = queue->head + 8 + headLen + bodyLen; - } else { - int32_t remain = queue->total - queue->head; - if (remain == 0) { - memcpy(pHead, queue->pBuffer + 8, headLen); - if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen); - queue->head = 8 + headLen + bodyLen; - } else if (remain == 8) { - memcpy(pHead, queue->pBuffer, headLen); - if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen); - queue->head = headLen + bodyLen; - } else if (remain < 8 + headLen) { - memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8); - memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8)); - if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen); - queue->head = headLen - (remain - 8) + bodyLen; - } else if (remain < 8 + headLen + bodyLen) { - memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); - if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen); - if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen)); - queue->head = bodyLen - (remain - 8 - headLen); - } else { - memcpy(pHead, queue->pBuffer + queue->head + 8, headLen); - if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen); - queue->head = queue->head + headLen + bodyLen + 8; - } - } - - queue->avail = queue->avail + headLen + bodyLen + 8; - queue->items--; - taosThreadMutexUnlock(&queue->mutex); - - *ppMsg = pHead; - (*ppMsg)->pCont = pBody; - *pFuncType = (EProcFuncType)ftype; - - dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype), - (*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items); - return 1; -} - -int32_t dmInitProc(struct SMgmtWrapper *pWrapper) { - SProc *proc = &pWrapper->proc; - if (proc->name != NULL) return 0; - - proc->wrapper = pWrapper; - proc->name = pWrapper->name; - - SShm *shm = &proc->shm; - int32_t cstart = 0; - int32_t csize = CEIL8(shm->size / 2); - int32_t pstart = csize; - int32_t psize = CEIL8(shm->size - pstart); - if (pstart + psize > shm->size) { - psize -= 8; - } - - proc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - proc->cqueue = dmInitProcQueue(proc, (char *)shm->ptr + cstart, csize); - proc->pqueue = dmInitProcQueue(proc, (char *)shm->ptr + pstart, psize); - if (proc->cqueue == NULL || proc->pqueue == NULL || proc->hash == NULL) { - dmCleanupProcQueue(proc->cqueue); - dmCleanupProcQueue(proc->pqueue); - taosHashCleanup(proc->hash); - return -1; - } - - dDebug("node:%s, proc is initialized, cqueue:%p pqueue:%p", proc->name, proc->cqueue, proc->pqueue); - return 0; -} - -static void *dmConsumChildQueue(void *param) { - SProc *proc = param; - SMgmtWrapper *pWrapper = proc->wrapper; - SProcQueue *queue = proc->cqueue; - int32_t numOfMsgs = 0; - int32_t code = 0; - EProcFuncType ftype = DND_FUNC_REQ; - SRpcMsg *pMsg = NULL; - - dDebug("node:%s, start to consume from cqueue", proc->name); - do { - numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype); - if (numOfMsgs == 0) { - dDebug("node:%s, get no msg from cqueue and exit thread", proc->name); - break; - } - - if (numOfMsgs < 0) { - dError("node:%s, get no msg from cqueue since %s", proc->name, terrstr()); - taosMsleep(1); - continue; - } - - if (ftype != DND_FUNC_REQ) { - dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - continue; - } - - code = dmProcessNodeMsg(pWrapper, pMsg); - if (code != 0) { - dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr()); - SRpcMsg rsp = {.code = (terrno != 0 ? terrno : code), .info = pMsg->info}; - dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } - } while (1); - - return NULL; -} - -static void *dmConsumParentQueue(void *param) { - SProc *proc = param; - SMgmtWrapper *pWrapper = proc->wrapper; - SProcQueue *queue = proc->pqueue; - int32_t numOfMsgs = 0; - int32_t code = 0; - EProcFuncType ftype = DND_FUNC_REQ; - SRpcMsg *pMsg = NULL; - - dDebug("node:%s, start to consume from pqueue", proc->name); - do { - numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype); - if (numOfMsgs == 0) { - dDebug("node:%s, get no msg from pqueue and exit thread", proc->name); - break; - } - - if (numOfMsgs < 0) { - dError("node:%s, get no msg from pqueue since %s", proc->name, terrstr()); - taosMsleep(1); - continue; - } - - if (ftype == DND_FUNC_RSP) { - dmRemoveProcRpcHandle(proc, pMsg->info.handle); - rpcSendResponse(pMsg); - } else if (ftype == DND_FUNC_REGIST) { - rpcRegisterBrokenLinkArg(pMsg); - } else if (ftype == DND_FUNC_RELEASE) { - dmRemoveProcRpcHandle(proc, pMsg->info.handle); - rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER); - } else { - dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype); - rpcFreeCont(pMsg->pCont); - } - - taosFreeQitem(pMsg); - } while (1); - - return NULL; -} - -int32_t dmRunProc(SProc *proc) { - TdThreadAttr thAttr = {0}; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - - if (proc->ptype & DND_PROC_PARENT) { - if (taosThreadCreate(&proc->pthread, &thAttr, dmConsumParentQueue, proc) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to create pthread since %s", proc->name, terrstr()); - return -1; - } - dDebug("node:%s, thread:%" PRId64 " is created to consume pqueue", proc->name, taosGetPthreadId(proc->pthread)); - } - - if (proc->ptype & DND_PROC_CHILD) { - if (taosThreadCreate(&proc->cthread, &thAttr, dmConsumChildQueue, proc) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to create cthread since %s", proc->name, terrstr()); - return -1; - } - dDebug("node:%s, thread:%" PRId64 " is created to consume cqueue", proc->name, taosGetPthreadId(proc->cthread)); - } - - taosThreadAttrDestroy(&thAttr); - return 0; -} - -void dmStopProc(SProc *proc) { - proc->stop = true; - if (taosCheckPthreadValid(proc->pthread)) { - dDebug("node:%s, start to join pthread:%" PRId64 "", proc->name, taosGetPthreadId(proc->pthread)); - tsem_post(&proc->pqueue->sem); - taosThreadJoin(proc->pthread, NULL); - taosThreadClear(&proc->pthread); - } - - if (taosCheckPthreadValid(proc->cthread)) { - dDebug("node:%s, start to join cthread:%" PRId64 "", proc->name, taosGetPthreadId(proc->cthread)); - tsem_post(&proc->cqueue->sem); - taosThreadJoin(proc->cthread, NULL); - taosThreadClear(&proc->cthread); - } -} - -void dmCleanupProc(struct SMgmtWrapper *pWrapper) { - SProc *proc = &pWrapper->proc; - if (proc->name == NULL) return; - - dDebug("node:%s, start to cleanup proc", pWrapper->name); - dmStopProc(proc); - dmCleanupProcQueue(proc->cqueue); - dmCleanupProcQueue(proc->pqueue); - taosHashCleanup(proc->hash); - proc->hash = NULL; - dDebug("node:%s, proc is cleaned up", pWrapper->name); -} - -void dmRemoveProcRpcHandle(SProc *proc, void *handle) { - int64_t h = (int64_t)handle; - taosThreadMutexLock(&proc->cqueue->mutex); - taosHashRemove(proc->hash, &h, sizeof(int64_t)); - taosThreadMutexUnlock(&proc->cqueue->mutex); -} - -void dmCloseProcRpcHandles(SProc *proc) { - taosThreadMutexLock(&proc->cqueue->mutex); - SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL); - while (pInfo != NULL) { - dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle); - SRpcMsg rpcMsg = {.code = TSDB_CODE_NODE_OFFLINE, .info = *pInfo}; - rpcSendResponse(&rpcMsg); - pInfo = taosHashIterate(proc->hash, pInfo); - } - taosHashClear(proc->hash); - taosThreadMutexUnlock(&proc->cqueue->mutex); -} - -void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) { - int32_t retry = 0; - while (1) { - if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) { - break; - } - - if (terrno != TSDB_CODE_OUT_OF_SHM_MEM) { - pMsg->code = terrno; - if (pMsg->contLen > 0) { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - pMsg->contLen = 0; - } - dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name, - dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle); - } else { - dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name, - dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry); - retry++; - taosMsleep(retry); - } - } - - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - pMsg->contLen = 0; -} - -int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) { - int32_t code = dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype); - if (code == 0) { - dTrace("msg:%p, is freed after push to cqueue", pMsg); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } - return code; -} diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 542baaec091b5d9bf8ce04a10ee4edc8463fc203..7ad24be258ee23b547543f0296cc27ea281ee89f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -17,14 +17,7 @@ #include "dmMgmt.h" #include "qworker.h" -static inline void dmSendRsp(SRpcMsg *pMsg) { - SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (InChildProc(pWrapper)) { - dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); - } else { - rpcSendResponse(pMsg); - } -} +static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); } static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { SEpSet epSet = {0}; @@ -157,11 +150,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { memcpy(pMsg, pRpc, sizeof(SRpcMsg)); dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); - if (InParentProc(pWrapper)) { - code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ); - } else { - code = dmProcessNodeMsg(pWrapper, pMsg); - } + code = dmProcessNodeMsg(pWrapper, pMsg); _OVER: if (code != 0) { @@ -233,24 +222,9 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { } } -static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { - SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (InChildProc(pWrapper)) { - dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST); - } else { - rpcRegisterBrokenLinkArg(pMsg); - } -} +static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); } -static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { - SMgmtWrapper *pWrapper = pHandle->wrapper; - if (InChildProc(pWrapper)) { - SRpcMsg msg = {.code = type, .info = *pHandle}; - dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE); - } else { - rpcReleaseHandle(pHandle, type); - } -} +static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcReleaseHandle(pHandle, type); } static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 2f9c0aff2f9bd94638587fa18362bbb4a6312934..8719e988e7acd44548708a83f9e846d981170712 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -83,20 +83,6 @@ typedef enum { DND_ENV_CLEANUP, } EDndEnvStatus; -typedef enum { - DND_PROC_SINGLE, - DND_PROC_CHILD, - DND_PROC_PARENT, - DND_PROC_TEST, -} EDndProcType; - -typedef enum { - DND_FUNC_REQ = 1, - DND_FUNC_RSP = 2, - DND_FUNC_REGIST = 3, - DND_FUNC_RELEASE = 4, -} EProcFuncType; - typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*SendMonitorReportFp)(); @@ -165,11 +151,7 @@ typedef struct { // dmUtil.c const char *dmStatStr(EDndRunStatus stype); -const char *dmNodeLogName(EDndNodeType ntype); -const char *dmNodeProcName(EDndNodeType ntype); const char *dmNodeName(EDndNodeType ntype); -const char *dmProcStr(EDndProcType ptype); -const char *dmFuncStr(EProcFuncType etype); void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool needCheckVgId); void dmGetMonitorSystemInfo(SMonSysInfo *pInfo); @@ -177,8 +159,6 @@ void dmGetMonitorSystemInfo(SMonSysInfo *pInfo); int32_t dmReadFile(const char *path, const char *name, bool *pDeployed); int32_t dmWriteFile(const char *path, const char *name, bool deployed); TdFilePtr dmCheckRunning(const char *dataDir); -int32_t dmReadShmFile(const char *path, const char *name, EDndNodeType runType, SShm *pShm); -int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm); // dmEps.c int32_t dmReadEps(SDnodeData *pData); diff --git a/source/dnode/mgmt/node_util/src/dmFile.c b/source/dnode/mgmt/node_util/src/dmFile.c index 9ec17a18b57ac2ce28dfb7c65704748719bc2e0a..d387fe4a3f52400e3dcb5521d4c7a94c8802d607 100644 --- a/source/dnode/mgmt/node_util/src/dmFile.c +++ b/source/dnode/mgmt/node_util/src/dmFile.c @@ -148,114 +148,3 @@ TdFilePtr dmCheckRunning(const char *dataDir) { dDebug("lock file:%s to prevent repeated starts", filepath); return pFile; } - -int32_t dmReadShmFile(const char *path, const char *name, EDndNodeType runType, SShm *pShm) { - int32_t code = -1; - char content[MAXLEN + 1] = {0}; - char file[PATH_MAX] = {0}; - cJSON *root = NULL; - TdFilePtr pFile = NULL; - - snprintf(file, sizeof(file), "%s%sshmfile", path, TD_DIRSEP); - pFile = taosOpenFile(file, TD_FILE_READ); - if (pFile == NULL) { - code = 0; - goto _OVER; - } - - if (taosReadFile(pFile, content, MAXLEN) > 0) { - root = cJSON_Parse(content); - if (root == NULL) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; - dError("node:%s, failed to read %s since invalid json format", name, file); - goto _OVER; - } - - cJSON *shmid = cJSON_GetObjectItem(root, "shmid"); - if (shmid && shmid->type == cJSON_Number) { - pShm->id = shmid->valueint; - } - - cJSON *shmsize = cJSON_GetObjectItem(root, "shmsize"); - if (shmsize && shmsize->type == cJSON_Number) { - pShm->size = shmsize->valueint; - } - } - - if (!tsMultiProcess || runType == DNODE || runType == NODE_END) { - if (pShm->id >= 0) { - dDebug("node:%s, shmid:%d, is closed, size:%d", name, pShm->id, pShm->size); - taosDropShm(pShm); - } - } else { - if (taosAttachShm(pShm) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("shmid:%d, failed to attach shm since %s", pShm->id, terrstr()); - goto _OVER; - } - dInfo("node:%s, shmid:%d is attached, size:%d", name, pShm->id, pShm->size); - } - - dDebug("node:%s, successed to load %s", name, file); - code = 0; - -_OVER: - if (root != NULL) cJSON_Delete(root); - if (pFile != NULL) taosCloseFile(&pFile); - - return code; -} - -int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm) { - int32_t code = -1; - int32_t len = 0; - char content[MAXLEN + 1] = {0}; - char file[PATH_MAX] = {0}; - char realfile[PATH_MAX] = {0}; - TdFilePtr pFile = NULL; - - snprintf(file, sizeof(file), "%s%sshmfile.bak", path, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%sshmfile", path, TD_DIRSEP); - - pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to open file:%s since %s", name, file, terrstr()); - goto _OVER; - } - - len += snprintf(content + len, MAXLEN - len, "{\n"); - len += snprintf(content + len, MAXLEN - len, " \"shmid\":%d,\n", pShm->id); - len += snprintf(content + len, MAXLEN - len, " \"shmsize\":%d\n", pShm->size); - len += snprintf(content + len, MAXLEN - len, "}\n"); - - if (taosWriteFile(pFile, content, len) != len) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to write file:%s since %s", name, file, terrstr()); - goto _OVER; - } - - if (taosFsyncFile(pFile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to fsync file:%s since %s", name, file, terrstr()); - goto _OVER; - } - - taosCloseFile(&pFile); - - if (taosRenameFile(file, realfile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to rename %s to %s since %s", name, file, realfile, terrstr()); - return -1; - } - - dInfo("node:%s, successed to write %s", name, realfile); - code = 0; - -_OVER: - if (pFile != NULL) { - taosCloseFile(&pFile); - } - - return code; -} diff --git a/source/dnode/mgmt/node_util/src/dmUtil.c b/source/dnode/mgmt/node_util/src/dmUtil.c index f5364b3195ebf9637df6fc86cbe536041df14756..80bb8debd28db2a6f15fbeb243702b267dd38676 100644 --- a/source/dnode/mgmt/node_util/src/dmUtil.c +++ b/source/dnode/mgmt/node_util/src/dmUtil.c @@ -29,36 +29,6 @@ const char *dmStatStr(EDndRunStatus stype) { } } -const char *dmNodeLogName(EDndNodeType ntype) { - switch (ntype) { - case VNODE: - return "vnode"; - case QNODE: - return "qnode"; - case SNODE: - return "snode"; - case MNODE: - return "mnode"; - default: - return "taosd"; - } -} - -const char *dmNodeProcName(EDndNodeType ntype) { - switch (ntype) { - case VNODE: - return "taosv"; - case QNODE: - return "taosq"; - case SNODE: - return "taoss"; - case MNODE: - return "taosm"; - default: - return "taosd"; - } -} - const char *dmNodeName(EDndNodeType ntype) { switch (ntype) { case VNODE: @@ -74,36 +44,6 @@ const char *dmNodeName(EDndNodeType ntype) { } } -const char *dmProcStr(EDndProcType etype) { - switch (etype) { - case DND_PROC_SINGLE: - return "start"; - case DND_PROC_CHILD: - return "stop"; - case DND_PROC_PARENT: - return "child"; - case DND_PROC_TEST: - return "test"; - default: - return "UNKNOWN"; - } -} - -const char *dmFuncStr(EProcFuncType etype) { - switch (etype) { - case DND_FUNC_REQ: - return "req"; - case DND_FUNC_RSP: - return "rsp"; - case DND_FUNC_REGIST: - return "regist"; - case DND_FUNC_RELEASE: - return "release"; - default: - return "UNKNOWN"; - } -} - void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool needCheckVgId) { SMgmtHandle handle = { .msgType = msgType, diff --git a/source/dnode/mgmt/test/sut/src/server.cpp b/source/dnode/mgmt/test/sut/src/server.cpp index de35b06b05bb2034340f441036f01e6f928c5b08..98c59a1614303c4c8a342d1d63ab39e9116214fd 100644 --- a/source/dnode/mgmt/test/sut/src/server.cpp +++ b/source/dnode/mgmt/test/sut/src/server.cpp @@ -16,7 +16,7 @@ #include "sut.h" void* serverLoop(void* param) { - dmInit(0); + dmInit(); dmRun(); dmCleanup(); return NULL; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index b7bdadec03a184a8debc7c2f9a0366fa1ec80734..72f7365a1e3304c8fa5c387eea24a968ebb87c14 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -21,7 +21,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { if (tEncodeI64(pCoder, pME->version) < 0) return -1; if (tEncodeI8(pCoder, pME->type) < 0) return -1; if (tEncodeI64(pCoder, pME->uid) < 0) return -1; - if (tEncodeCStr(pCoder, pME->name) < 0) return -1; + if (pME->name == NULL || tEncodeCStr(pCoder, pME->name) < 0) return -1; if (pME->type == TSDB_SUPER_TABLE) { if (tEncodeI8(pCoder, pME->flags) < 0) return -1; // TODO: need refactor? diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index fe71987103aedbee555b67e99c95a7fb6fb2fb0e..0786321686a5ae6f5b7f7937f4c5ec425f6d6c31 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -505,6 +505,7 @@ typedef struct SCtgOperation { #define CTG_FLAG_UNKNOWN_STB 0x4 #define CTG_FLAG_SYS_DB 0x8 #define CTG_FLAG_FORCE_UPDATE 0x10 +#define CTG_FLAG_ONLY_CACHE 0x20 #define CTG_FLAG_SET(_flag, _v) ((_flag) |= (_v)) @@ -783,7 +784,7 @@ void ctgFreeQNode(SCtgQNode* node); void ctgClearHandle(SCatalog* pCtg); void ctgFreeTbCacheImpl(SCtgTbCache* pCache); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); -int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup); +int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 218a86ed5cad6979afd483394906969df5ff0dc0..5997f63a16e0d669467a18a53eb84e9e6bcfebd9 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -23,12 +23,21 @@ SCatalogMgmt gCtgMgmt = {0}; int32_t ctgGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SCtgDBCache** dbCache, - SDBVgInfo** pInfo) { + SDBVgInfo** pInfo, bool* exists) { int32_t code = 0; CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache)); if (*dbCache) { + if (exists) { + *exists = true; + } + + return TSDB_CODE_SUCCESS; + } + + if (exists) { + *exists = false; return TSDB_CODE_SUCCESS; } @@ -94,7 +103,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* int32_t code = 0; if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) { - CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo)); + CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo, NULL)); } STableMetaOutput moutput = {0}; @@ -194,7 +203,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx STableMetaOutput* output = NULL; CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, ctx, pTableMeta)); - if (*pTableMeta) { + if (*pTableMeta || (ctx->flag & CTG_FLAG_ONLY_CACHE)) { goto _return; } @@ -415,7 +424,7 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL)); } else { SVgroupInfo vgroupInfo = {0}; - CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo)); + CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo, NULL)); CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL)); } @@ -441,7 +450,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTabl tNameGetFullDbName(pTableName, db); SHashObj* vgHash = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo, NULL)); if (dbCache) { vgHash = dbCache->vgCache.vgInfo->vgHash; @@ -501,7 +510,7 @@ _return: CTG_RET(code); } -int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup) { +int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists) { if (IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -513,8 +522,13 @@ int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* tNameGetFullDbName(pTableName, db); SDBVgInfo* vgInfo = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo, exists)); + if (exists && false == *exists) { + ctgDebug("db %s vgInfo not in cache", pTableName->dbname); + return TSDB_CODE_SUCCESS; + } + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup)); _return: @@ -737,7 +751,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* SArray* vgList = NULL; SHashObj* vgHash = NULL; SDBVgInfo* vgInfo = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo)); + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo, NULL)); if (dbCache) { vgHash = dbCache->vgCache.vgInfo->vgHash; } else { @@ -880,6 +894,17 @@ int32_t catalogGetTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta)); } +int32_t catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta) { + CTG_API_ENTER(); + + SCtgTbMetaCtx ctx = {0}; + ctx.pName = (SName*)pTableName; + ctx.flag = CTG_FLAG_UNKNOWN_STB | CTG_FLAG_ONLY_CACHE; + + CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta)); +} + + int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMeta** pTableMeta) { CTG_API_ENTER(); @@ -891,6 +916,18 @@ int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SNam CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta)); } +int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, + STableMeta** pTableMeta) { + CTG_API_ENTER(); + + SCtgTbMetaCtx ctx = {0}; + ctx.pName = (SName*)pTableName; + ctx.flag = CTG_FLAG_STB | CTG_FLAG_ONLY_CACHE; + + CTG_API_LEAVE(ctgGetTbMeta(pCtg, pConn, &ctx, pTableMeta)); +} + + int32_t catalogUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg) { CTG_API_ENTER(); @@ -1009,7 +1046,14 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SVgroupInfo* pVgroup) { CTG_API_ENTER(); - CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup)); + CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, NULL)); +} + +int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, + SVgroupInfo* pVgroup, bool* exists) { + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, exists)); } int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 8f11fb88a2399f84af8b32e41fb9477c2d61485c..ba0b3dbcf8206a54b68bfffbb0c4a14584dc93b9 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1094,6 +1094,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const // fetch next ofp, a new ofp and make it dirty ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt); if (ret < 0) { + tdbFree(pBuf); return -1; } } @@ -1135,6 +1136,11 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes); memcpy(pBuf + bytes, &pgno, sizeof(pgno)); + if (ofp == NULL) { + tdbFree(pBuf); + return -1; + } + ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0); if (ret < 0) { tdbFree(pBuf); diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index 93f972b29e3d8d20ddb8d53e3c7ce3ced8f7b57f..bdbd6c2f3dbb904eae7b91c6cb935caa9b20aa2c 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -105,7 +105,7 @@ static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) { for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) { if (tdbPageCreate(pCache->szPage, &aPage[iPage], tdbDefaultMalloc, NULL) < 0) { // TODO: handle error - tdbOsFree(pCache->aPage); + tdbOsFree(aPage); return -1; } diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 616ab7875dd9634b4ad4258c51320b589dd744ae..ac1881fc6d843b3ad215ceb809d44e5811c53328 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -105,3 +105,5 @@ void osSetSystemLocale(const char *inLocale, const char *inCharSet) { memcpy(tsLocale, inLocale, strlen(inLocale) + 1); memcpy(tsCharset, inCharSet, strlen(inCharSet) + 1); } + +void osSetProcPath(int32_t argc, char **argv) { tsProcPath = argv[0]; } diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c deleted file mode 100644 index f060ab48c9bb9fd45b385c36a77db1e0adba0e6f..0000000000000000000000000000000000000000 --- a/source/os/src/osProc.c +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define ALLOW_FORBID_FUNC -#define _DEFAULT_SOURCE -#include "os.h" - -int32_t taosNewProc(char **args) { -#ifdef WINDOWS - assert(0); - return 0; -#else - int32_t pid = fork(); - if (pid == 0) { - args[0] = tsProcPath; - // close(STDIN_FILENO); - // close(STDOUT_FILENO); - // close(STDERR_FILENO); - return execvp(tsProcPath, args); - } else { - return pid; - } -#endif -} - -void taosWaitProc(int32_t pid) { -#ifdef WINDOWS - assert(0); -#else - int32_t status = -1; - waitpid(pid, &status, 0); -#endif -} - -void taosKillProc(int32_t pid) { -#ifdef WINDOWS - assert(0); -#else - kill(pid, SIGINT); -#endif -} - -bool taosProcExist(int32_t pid) { -#ifdef WINDOWS - assert(0); - return false; -#else - int32_t p = getpgid(pid); - return p >= 0; -#endif -} - -// the length of the new name must be less than the original name to take effect -void taosSetProcName(int32_t argc, char **argv, const char *name) { - setThreadName(name); - - for (int32_t i = 0; i < argc; ++i) { - int32_t len = strlen(argv[i]); - for (int32_t j = 0; j < len; ++j) { - argv[i][j] = 0; - } - if (i == 0) { - tstrncpy(argv[0], name, len + 1); - } - } -} - -void taosSetProcPath(int32_t argc, char **argv) { tsProcPath = argv[0]; } diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c deleted file mode 100644 index cb09e2fb38c5edcc00ae753245d98b9ca498e825..0000000000000000000000000000000000000000 --- a/source/os/src/osShm.c +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define ALLOW_FORBID_FUNC -#define _DEFAULT_SOURCE -#include "os.h" - -#define MAX_SHMIDS 6 - -static int32_t shmids[MAX_SHMIDS] = {0}; - -static void taosDeleteCreatedShms() { -#if defined(WINDOWS) - assert(0); -#else - for (int32_t i = 0; i < MAX_SHMIDS; ++i) { - int32_t shmid = shmids[i] - 1; - if (shmid >= 0) { - shmctl(shmid, IPC_RMID, NULL); - } - } -#endif -} - -int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { -#if defined(WINDOWS) - assert(0); -#else - pShm->id = -1; - -#if 1 - key_t __shkey = IPC_PRIVATE; - int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600; -#else - key_t __shkey = 0X95270000 + key; - int32_t __shmflag = IPC_CREAT | 0600; -#endif - - int32_t shmid = shmget(__shkey, shmsize, __shmflag); - if (shmid < 0) { - return -1; - } - - void* shmptr = shmat(shmid, NULL, 0); - if (shmptr == NULL) { - return -1; - } - - pShm->id = shmid; - pShm->size = shmsize; - pShm->ptr = shmptr; - -#if 0 - if (key >= 0 && key < MAX_SHMIDS) { - shmids[key] = pShm->id + 1; - } - atexit(taosDeleteCreatedShms); -#else - shmctl(pShm->id, IPC_RMID, NULL); -#endif - -#endif - return 0; -} - -void taosDropShm(SShm* pShm) { -#if defined(WINDOWS) - assert(0); -#else - if (pShm->id >= 0) { - if (pShm->ptr != NULL) { - shmdt(pShm->ptr); - } - shmctl(pShm->id, IPC_RMID, NULL); - } - pShm->id = -1; - pShm->size = 0; - pShm->ptr = NULL; -#endif -} - -int32_t taosAttachShm(SShm* pShm) { -#if defined(WINDOWS) - assert(0); -#else - errno = 0; - - void* ptr = shmat(pShm->id, NULL, 0); - if (errno == 0) { - pShm->ptr = ptr; - } -#endif - return errno; -} diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 6e42ef7e75eac38a5a072bfef8521152fd74ae06..2e307771b7ed6c5473c4edc0b1947de199f82f10 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -45,14 +45,6 @@ INCLUDE_DIRECTORIES(${TD_SOURCE_DIR}/src/util/inc) # add_executable(encodeTest "encodeTest.cpp") # target_link_libraries(encodeTest os util gtest gtest_main) -# queueTest -# add_executable(procTest "procTest.cpp") -# target_link_libraries(procTest os util transport sut gtest_main) -# add_test( -# NAME procTest -# COMMAND procTest -# ) - # cfgTest add_executable(cfgTest "cfgTest.cpp") target_link_libraries(cfgTest os util gtest_main) diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp deleted file mode 100644 index 53d3fa2c4bc118d57d9a8d08ecc4810f83aa5eda..0000000000000000000000000000000000000000 --- a/source/util/test/procTest.cpp +++ /dev/null @@ -1,264 +0,0 @@ -/** - * @file queue.cpp - * @author slguan (slguan@taosdata.com) - * @brief UTIL module queue tests - * @version 1.0 - * @date 2022-01-27 - * - * @copyright Copyright (c) 2022 - * - */ -#if 0 -#include -#include "tlog.h" -#include "tprocess.h" -#include "tqueue.h" - -typedef struct STestMsg { - uint16_t msgType; - void *pCont; - int contLen; - int32_t code; - void *handle; // rpc handle returned to app - void *ahandle; // app handle set by client - int noResp; // has response or not(default 0, 0: resp, 1: no resp); - int persistHandle; // persist handle or not -} STestMsg; - -class UtilTesProc : public ::testing::Test { - public: - void SetUp() override { - shm.id = -1; - for (int32_t i = 0; i < 4000; ++i) { - body[i] = i % 26 + 'a'; - } - head.pCont = body; - head.code = 1; - head.msgType = 2; - head.noResp = 3; - head.persistHandle = 4; - - taosRemoveDir(TD_TMP_DIR_PATH "td"); - taosMkDir(TD_TMP_DIR_PATH "td"); - tstrncpy(tsLogDir, TD_TMP_DIR_PATH "td", PATH_MAX); - if (taosInitLog("taosdlog", 1) != 0) { - printf("failed to init log file\n"); - } - } - void TearDown() override { taosDropShm(&shm); } - - public: - static STestMsg head; - static char body[4000]; - static SShm shm; - static void SetUpTestSuite() {} - static void TearDownTestSuite() {} -}; - -SShm UtilTesProc::shm; -char UtilTesProc::body[4000]; -STestMsg UtilTesProc::head; - -TEST_F(UtilTesProc, 00_Init_Cleanup) { - ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0); - - shm.size = 1023; - SProcCfg cfg = {(ProcConsumeFp)NULL, - (ProcMallocFp)taosAllocateQitem, - (ProcFreeFp)taosFreeQitem, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryMalloc, - (ProcConsumeFp)NULL, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryMalloc, - shm, - &shm, - "1234"}; - SProc *proc = dmInitProc(&cfg); - ASSERT_EQ(proc, nullptr); - - shm.size = 2468; - cfg.shm = shm; - proc = dmInitProc(&cfg); - ASSERT_NE(proc, nullptr); - - ASSERT_EQ(dmRunProc(proc), 0); - dmCleanupProc(proc); - taosDropShm(&shm); -} - -void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) { - STestMsg msg; - memcpy(&msg, pHead, headLen); - char body[2000] = {0}; - memcpy(body, pBody, bodyLen); - - uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent, - ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body); - taosMemoryFree(pBody); - taosFreeQitem(pHead); -} - -TEST_F(UtilTesProc, 01_Push_Pop_Child) { - shm.size = 3000; - ASSERT_EQ(taosCreateShm(&shm, 1235, shm.size), 0); - SProcCfg cfg = {(ProcConsumeFp)ConsumeChild1, - (ProcMallocFp)taosAllocateQitem, - (ProcFreeFp)taosFreeQitem, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - (ProcConsumeFp)NULL, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - shm, - (void *)((int64_t)1235), - "1235_c"}; - SProc *cproc = dmInitProc(&cfg); - ASSERT_NE(cproc, nullptr); - - ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_RSP), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_REGIST), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_RELEASE), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, NULL, 12, body, 0, 0, 0, DND_FUNC_REQ), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, 0, body, 0, 0, 0, DND_FUNC_REQ), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, shm.size, body, 0, 0, 0, DND_FUNC_REQ), 0); - ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, DND_FUNC_REQ), 0); - - for (int32_t j = 0; j < 1000; j++) { - int32_t i = 0; - for (i = 0; i < 20; ++i) { - ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, DND_FUNC_REQ), 0); - } - ASSERT_NE(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, 0, 0, DND_FUNC_REQ), 0); - - cfg.isChild = true; - cfg.name = "1235_p"; - SProc *pproc = dmInitProc(&cfg); - ASSERT_NE(pproc, nullptr); - dmRunProc(pproc); - dmCleanupProc(pproc); - } - - dmCleanupProc(cproc); - taosDropShm(&shm); -} - -void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) { - STestMsg msg; - memcpy(&msg, pHead, headLen); - char body[2000] = {0}; - memcpy(body, pBody, bodyLen); - - uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent, - ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body); - taosMemoryFree(pBody); - taosMemoryFree(pHead); -} - -TEST_F(UtilTesProc, 02_Push_Pop_Parent) { - shm.size = 3000; - ASSERT_EQ(taosCreateShm(&shm, 1236, shm.size), 0); - SProcCfg cfg = {(ProcConsumeFp)NULL, - (ProcMallocFp)taosAllocateQitem, - (ProcFreeFp)taosFreeQitem, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - (ProcConsumeFp)ConsumeParent1, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - shm, - (void *)((int64_t)1236), - "1236_c"}; - SProc *cproc = dmInitProc(&cfg); - ASSERT_NE(cproc, nullptr); - - cfg.name = "1236_p"; - cfg.isChild = true; - SProc *pproc = dmInitProc(&cfg); - ASSERT_NE(pproc, nullptr); - - for (int32_t j = 0; j < 1000; j++) { - int32_t i = 0; - for (i = 0; i < 20; ++i) { - dmPutToProcPQueue(pproc, &head, sizeof(STestMsg), body, i, DND_FUNC_REQ); - } - - dmRunProc(cproc); - dmStopProc(cproc); - } - - dmCleanupProc(pproc); - dmCleanupProc(cproc); - taosDropShm(&shm); -} - -void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) { - STestMsg msg; - memcpy(&msg, pHead, headLen); - char body[2000] = {0}; - memcpy(body, pBody, bodyLen); - - uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent, - ftype, headLen, bodyLen, (int64_t)msg.handle, body); - taosMemoryFree(pBody); - taosFreeQitem(pHead); -} - -void processHandle(void *handle) { uDebug("----> remove handle:%" PRId64 " <----", (int64_t)handle); } - -TEST_F(UtilTesProc, 03_Handle) { - // uDebugFlag = 207; - shm.size = 3000; - ASSERT_EQ(taosCreateShm(&shm, 1237, shm.size), 0); - SProcCfg cfg = {(ProcConsumeFp)ConsumeChild3, - (ProcMallocFp)taosAllocateQitem, - (ProcFreeFp)taosFreeQitem, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - (ProcConsumeFp)NULL, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - (ProcMallocFp)taosMemoryMalloc, - (ProcFreeFp)taosMemoryFree, - shm, - (void *)((int64_t)1235), - "1237_p"}; - SProc *cproc = dmInitProc(&cfg); - ASSERT_NE(cproc, nullptr); - - for (int32_t j = 0; j < 1; j++) { - int32_t i = 0; - for (i = 0; i < 20; ++i) { - head.handle = (void *)((int64_t)i); - ASSERT_EQ(dmPutToProcCQueue(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, DND_FUNC_REQ), 0); - } - - cfg.isChild = true; - cfg.name = "child_queue"; - SProc *pproc = dmInitProc(&cfg); - ASSERT_NE(pproc, nullptr); - dmRunProc(pproc); - dmCleanupProc(pproc); - - int64_t ref = 0; - - ref = dmRemoveProcRpcHandle(cproc, (void *)((int64_t)3)); - EXPECT_EQ(ref, 3); - ref = dmRemoveProcRpcHandle(cproc, (void *)((int64_t)5)); - EXPECT_EQ(ref, 5); - ref = dmRemoveProcRpcHandle(cproc, (void *)((int64_t)6)); - EXPECT_EQ(ref, 6); - dmCloseProcRpcHandles(cproc, processHandle); - } - - dmCleanupProc(cproc); - taosDropShm(&shm); -} - -#endif \ No newline at end of file diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 81625d55e5680c7688dd47305271714fd5c8524f..8ad2deeba9bd16f93d885e2111cca32a0ed4ad97 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -299,18 +299,6 @@ ./test.sh -f tsim/stable/values.sim ./test.sh -f tsim/stable/vnode3.sim -# --- for multi process mode -./test.sh -f tsim/user/basic.sim -m -./test.sh -f tsim/db/basic3.sim -m -./test.sh -f tsim/db/error1.sim -m -./test.sh -f tsim/insert/backquote.sim -m -# unsupport ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m -./test.sh -f tsim/query/interval-offset.sim -m -# unsupport ./test.sh -f tsim/tmq/basic3.sim -m -./test.sh -f tsim/stable/vnode3.sim -m -./test.sh -f tsim/qnode/basic1.sim -m -# unsupport ./test.sh -f tsim/mnode/basic1.sim -m - # --- sma ./test.sh -f tsim/sma/drop_sma.sim ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 5f497a248f7aadd1e060cd44ee9073d792bf14ba..606afe164b9657e63ae68c2edb1231073031d96d 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -9,7 +9,6 @@ UNAME_BIN=`which uname` OS_TYPE=`$UNAME_BIN` NODE_NAME= NODE= -MULTIPROCESS=0 while getopts "n:i:m" arg do case $arg in @@ -19,9 +18,6 @@ do i) NODE=$OPTARG ;; - m) - MULTIPROCESS=1 - ;; ?) echo "unkonw argument" ;; @@ -148,5 +144,4 @@ echo "numOfLogLines 20000000" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG echo "telemetryReporting 0" >> $TAOS_CFG -echo "multiProcess ${MULTIPROCESS}" >> $TAOS_CFG echo " " >> $TAOS_CFG diff --git a/tests/script/test.sh b/tests/script/test.sh index 0ffe8cf8f16e9cf6114e3dd4348de56a4f8a497c..15305679876625b150aa4d53038f025e0acb7ff6 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -15,7 +15,6 @@ VALGRIND=0 UNIQUE=0 UNAME_BIN=`which uname` OS_TYPE=`$UNAME_BIN` -MULTIPROCESS=0 while getopts "f:avum" arg do case $arg in @@ -28,9 +27,6 @@ do u) UNIQUE=1 ;; - m) - MULTIPROCESS=1 - ;; ?) echo "unknow argument" ;; @@ -126,22 +122,12 @@ ulimit -c unlimited if [ -n "$FILE_NAME" ]; then echo "------------------------------------------------------------------------" if [ $VALGRIND -eq 1 ]; then - if [[ $MULTIPROCESS -eq 1 ]];then - FLAG="-m -v" - else - FLAG="-v" - fi - + FLAG="-v" echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --child-silent-after-fork=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --child-silent-after-fork=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG else - if [[ $MULTIPROCESS -eq 1 ]];then - echo "ExcuteCmd(multiprocess):" $PROGRAM -m -c $CFG_DIR -f $FILE_NAME - $PROGRAM -m -c $CFG_DIR -f $FILE_NAME - else - echo "ExcuteCmd(singleprocess):" $PROGRAM -c $CFG_DIR -f $FILE_NAME - $PROGRAM -c $CFG_DIR -f $FILE_NAME - fi + echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f $FILE_NAME + $PROGRAM -c $CFG_DIR -f $FILE_NAME fi else echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f basicSuite.sim diff --git a/utils/tsim/inc/simInt.h b/utils/tsim/inc/simInt.h index f9e96fc67b20eee8b05619475c3d577cfa5b659c..9438e11fbd3fbfbf6b67e2b5afbfcb78d37a7144 100644 --- a/utils/tsim/inc/simInt.h +++ b/utils/tsim/inc/simInt.h @@ -155,7 +155,6 @@ extern int32_t simScriptSucced; extern int32_t simDebugFlag; extern char simScriptDir[]; extern bool abortExecution; -extern bool useMultiProcess; extern bool useValgrind; SScript *simParseScript(char *fileName); diff --git a/utils/tsim/src/simExe.c b/utils/tsim/src/simExe.c index 582b6c054afaabe2af736e907569a48dad7b74d6..7218ea39f86eb40586b8c3b1815db5d574090eec 100644 --- a/utils/tsim/src/simExe.c +++ b/utils/tsim/src/simExe.c @@ -442,10 +442,6 @@ bool simExecuteSystemCmd(SScript *script, char *option) { simReplaceStr(buf, ".sh", ".bat"); #endif - if (useMultiProcess) { - simReplaceStr(buf, "deploy.sh", "deploy.sh -m"); - } - if (useValgrind) { replaced = simReplaceStr(buf, "exec.sh", "exec.sh -v"); } diff --git a/utils/tsim/src/simMain.c b/utils/tsim/src/simMain.c index e58a22cf682d1cbc5be9a32057caee25ef00c254..cd4a5117b2a5904b934775d4a24a23059714a1a7 100644 --- a/utils/tsim/src/simMain.c +++ b/utils/tsim/src/simMain.c @@ -18,7 +18,6 @@ bool simExecSuccess = false; bool abortExecution = false; -bool useMultiProcess = false; bool useValgrind = false; void simHandleSignal(int32_t signo, void *sigInfo, void *context) { @@ -34,8 +33,6 @@ int32_t main(int32_t argc, char *argv[]) { tstrncpy(configDir, argv[++i], 128); } else if (strcmp(argv[i], "-f") == 0 && i < argc - 1) { tstrncpy(scriptFile, argv[++i], MAX_FILE_NAME_LEN); - } else if (strcmp(argv[i], "-m") == 0) { - useMultiProcess = true; } else if (strcmp(argv[i], "-v") == 0) { useValgrind = true; } else {