diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 68faf08dd6a20c7091d53e429c00389e95afffb3..e86a2afbcde2a550a2412ef2397e837f15301d3e 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -72,7 +72,7 @@ void tscSetMgmtIpListFromEdge() { if (tscMgmtIpList.numOfIps != 1) { tscMgmtIpList.numOfIps = 1; tscMgmtIpList.inUse = 0; - tscMgmtIpList.port = tsMgmtShellPort; + tscMgmtIpList.port = tsMnodeShellPort; tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); tscTrace("edge mgmt IP list:"); tscPrintMgmtIp(); @@ -185,7 +185,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { SSqlCmd* pCmd = &pSql->cmd; if (pSql->cmd.command < TSDB_SQL_MGMT) { - pSql->ipList->port = tsVnodeShellPort; + pSql->ipList->port = tsDnodeShellPort; tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); @@ -198,7 +198,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { }; rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); } else { - pSql->ipList->port = tsMgmtShellPort; + pSql->ipList->port = tsMnodeShellPort; tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { @@ -306,7 +306,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { } // ignore the error information returned from mnode when set ignore flag in sql - if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CREATE_DB_RSP) { + if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) { pRes->code = TSDB_CODE_SUCCESS; } @@ -1685,7 +1685,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SCreateDbMsg); - pCmd->msgType = TSDB_MSG_TYPE_CREATE_DB; + pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DB; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); @@ -1711,7 +1711,7 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload; strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n); - pCmd->msgType = TSDB_MSG_TYPE_CREATE_DNODE; + pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE; return TSDB_CODE_SUCCESS; } @@ -1757,7 +1757,7 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } - pCmd->msgType = TSDB_MSG_TYPE_CREATE_ACCT; + pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_ACCT; return TSDB_CODE_SUCCESS; } @@ -1785,9 +1785,9 @@ int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) { - pCmd->msgType = TSDB_MSG_TYPE_ALTER_USER; + pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_USER; } else { - pCmd->msgType = TSDB_MSG_TYPE_CREATE_USER; + pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_USER; } return TSDB_CODE_SUCCESS; @@ -1821,7 +1821,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { strncpy(pDropDbMsg->db, pMeterMetaInfo->name, tListLen(pDropDbMsg->db)); pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0; - pCmd->msgType = TSDB_MSG_TYPE_DROP_DB; + pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB; return TSDB_CODE_SUCCESS; } @@ -1839,7 +1839,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name); pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0; - pCmd->msgType = TSDB_MSG_TYPE_DROP_TABLE; + pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_TABLE; return TSDB_CODE_SUCCESS; } @@ -1854,7 +1854,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pDrop->ip, pMeterMetaInfo->name); - pCmd->msgType = TSDB_MSG_TYPE_DROP_DNODE; + pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE; return TSDB_CODE_SUCCESS; } @@ -1862,7 +1862,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SDropUserMsg); - pCmd->msgType = TSDB_MSG_TYPE_DROP_USER; + pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); @@ -1888,7 +1888,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SUseDbMsg *pUseDbMsg = (SUseDbMsg*)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pUseDbMsg->db, pMeterMetaInfo->name); - pCmd->msgType = TSDB_MSG_TYPE_USE_DB; + pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB; return TSDB_CODE_SUCCESS; } @@ -1896,7 +1896,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { STscObj *pObj = pSql->pTscObj; SSqlCmd *pCmd = &pSql->cmd; - pCmd->msgType = TSDB_MSG_TYPE_SHOW; + pCmd->msgType = TSDB_MSG_TYPE_CM_SHOW; pCmd->payloadLen = sizeof(SShowMsg) + 100; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { @@ -1948,13 +1948,13 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n); switch (pCmd->command) { case TSDB_SQL_KILL_QUERY: - pCmd->msgType = TSDB_MSG_TYPE_KILL_QUERY; + pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY; break; case TSDB_SQL_KILL_CONNECTION: - pCmd->msgType = TSDB_MSG_TYPE_KILL_CONNECTION; + pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_CONN; break; case TSDB_SQL_KILL_STREAM: - pCmd->msgType = TSDB_MSG_TYPE_KILL_STREAM; + pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_STREAM; break; } return TSDB_CODE_SUCCESS; @@ -2043,7 +2043,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - (char*)pCreateTableMsg; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_CREATE_TABLE; + pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; assert(msgLen + minMsgSize() <= size); return TSDB_CODE_SUCCESS; @@ -2098,7 +2098,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - (char*)pAlterTableMsg; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_ALTER_TABLE; + pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE; assert(msgLen + minMsgSize() <= size); @@ -2108,7 +2108,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SAlterDbMsg); - pCmd->msgType = TSDB_MSG_TYPE_ALTER_DB; + pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); @@ -2243,7 +2243,7 @@ int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { STscObj *pObj = pSql->pTscObj; SSqlCmd *pCmd = &pSql->cmd; - pCmd->msgType = TSDB_MSG_TYPE_CONNECT; + pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT; pCmd->payloadLen = sizeof(SConnectMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { @@ -2297,7 +2297,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - (char*)pInfoMsg; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_TABLE_META; + pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META; tfree(tmpData); @@ -2335,7 +2335,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tfree(tmpData); pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg); - pCmd->msgType = TSDB_MSG_TYPE_MULTI_TABLE_META; + pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META; assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize); @@ -2509,7 +2509,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - pStart; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_STABLE_META; + pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_META; assert(msgLen + minMsgSize() <= size); return TSDB_CODE_SUCCESS; @@ -2566,7 +2566,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - pStart; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_HEARTBEAT; + pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT; assert(msgLen + minMsgSize() <= size); return msgLen; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 465263a439773f59fc07bbfe8452c3dd521d6d1f..59e5127e40b8a2b535f18a6630db81f7c6e846bc 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -67,7 +67,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const if (ip && ip[0]) { tscMgmtIpList.inUse = 0; - tscMgmtIpList.port = tsMgmtShellPort; + tscMgmtIpList.port = tsMnodeShellPort; tscMgmtIpList.numOfIps = 1; tscMgmtIpList.ip[0] = inet_addr(ip); @@ -82,7 +82,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const } } - tscMgmtIpList.port = port ? port : tsMgmtShellPort; + tscMgmtIpList.port = port ? port : tsMnodeShellPort; pObj = (STscObj *)malloc(sizeof(STscObj)); if (NULL == pObj) { @@ -95,7 +95,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const strncpy(pObj->user, user, TSDB_USER_LEN); taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass); - pObj->mgmtPort = port ? port : tsMgmtShellPort; + pObj->mgmtPort = port ? port : tsMnodeShellPort; if (db) { int32_t len = strlen(db); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index d7fe6f4ac8d6fe0462b065a2b2434d332b4ecfb7..d11f21be0fea6853b05cf1a24ed5727f1eaae74f 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -151,7 +151,7 @@ void taos_init_imp() { } tscMgmtIpList.inUse = 0; - tscMgmtIpList.port = tsMgmtShellPort; + tscMgmtIpList.port = tsMnodeShellPort; tscMgmtIpList.numOfIps = 1; tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index bc0ff164a21b735a49e352430eb7d5634ab69b8f..b8d01916fe0938676a09656813a91765e4b9507d 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -22,7 +22,7 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); -void dnodeMgmt(void *rpcMsg); +void dnodeMgmt(SRpcMsg *rpcMsg); void* dnodeGetVnode(int32_t vgId); int32_t dnodeGetVnodeStatus(void *pVnode); diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 078a86432db109b70a445418d2280015a7f31ffd..aded8a1922dca412d2c4f9178e23d4cfc264f03e 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -26,7 +26,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void *tsDnodeMClientRpc; int32_t dnodeInitMClient() { - dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessStatusRsp; + dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -34,7 +34,7 @@ int32_t dnodeInitMClient() { rpcInit.localPort = 0; rpcInit.label = "DND-MC"; rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessRspFromMnode; + rpcInit.cfp = dnodeProcessRspFromMnode; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 69af4caa28f4dbb278ace340c5d9e3e19fe6f553..6240a56486ec9955294af20e769e86b2aad4e057 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -21,7 +21,7 @@ #include "tlog.h" #include "trpc.h" #include "tstatus.h" -//#include "tsdb.h" +#include "tsdb.h" #include "dnodeMgmt.h" #include "dnodeRead.h" #include "dnodeWrite.h" @@ -46,17 +46,21 @@ static int32_t dnodeOpenVnode(int32_t vgId); static void dnodeCleanupVnode(SVnodeObj *pVnode); static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg); static void dnodeDropVnode(SVnodeObj *pVnode); -static void dnodeProcesSMDCreateVnodeMsg(SRpcMsg *pMsg); -static void dnodeProcesSMDDropVnodeMsg(SRpcMsg *pMsg); +static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); +static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); +static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); +static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static void * tsDnodeVnodesHash = NULL; int32_t dnodeInitMgmt() { - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcesSMDCreateVnodeMsg; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcesSMDDropVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); if (tsDnodeVnodesHash == NULL) { @@ -72,8 +76,7 @@ void dnodeCleanupMgmt() { taosCleanUpIntHash(tsDnodeVnodesHash); } -void dnodeMgmt(void *rpcMsg) { - SRpcMsg *pMsg = rpcMsg; +void dnodeMgmt(SRpcMsg *pMsg) { terrno = 0; if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { @@ -91,7 +94,7 @@ void dnodeMgmt(void *rpcMsg) { } void *dnodeGetVnode(int32_t vgId) { - SVnodeObj *pVnode = taosGetIntHashData(tsDnodeVnodesHash, vgId); + SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); if (pVnode == NULL) { terrno = TSDB_CODE_INVALID_VGROUP_ID; return NULL; @@ -140,123 +143,123 @@ static void dnodeCleanupVnodes() { } static int32_t dnodeOpenVnode(int32_t vgId) { -// char rootDir[TSDB_FILENAME_LEN] = {0}; -// sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId); -// -// void *pTsdb = tsdbOpenRepo(rootDir); -// if (pTsdb != NULL) { -// return terrno; -// } -// -// SVnodeObj vnodeObj; -// vnodeObj.vgId = vgId; -// vnodeObj.status = TSDB_VN_STATUS_NOT_READY; -// vnodeObj.refCount = 1; -// vnodeObj.version = 0; -// vnodeObj.wworker = dnodeAllocateWriteWorker(); -// vnodeObj.rworker = dnodeAllocateReadWorker(); -// vnodeObj.wal = NULL; -// vnodeObj.tsdb = pTsdb; -// vnodeObj.replica = NULL; -// vnodeObj.events = NULL; -// vnodeObj.cq = NULL; -// -// taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj); + char rootDir[TSDB_FILENAME_LEN] = {0}; + sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId); + + void *pTsdb = tsdbOpenRepo(rootDir); + if (pTsdb != NULL) { + return terrno; + } + + SVnodeObj vnodeObj; + vnodeObj.vgId = vgId; + vnodeObj.status = TSDB_VN_STATUS_NOT_READY; + vnodeObj.refCount = 1; + vnodeObj.version = 0; + vnodeObj.wworker = dnodeAllocateWriteWorker(); + vnodeObj.rworker = dnodeAllocateReadWorker(); + vnodeObj.wal = NULL; + vnodeObj.tsdb = pTsdb; + vnodeObj.replica = NULL; + vnodeObj.events = NULL; + vnodeObj.cq = NULL; + + taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); return TSDB_CODE_SUCCESS; } static void dnodeCleanupVnode(SVnodeObj *pVnode) { -// pVnode->status = TSDB_VN_STATUS_NOT_READY; -// int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); -// if (count > 0) { -// // wait refcount -// } -// -// // remove replica -// -// // remove read queue -// dnodeFreeReadWorker(pVnode->rworker); -// pVnode->rworker = NULL; -// -// // remove write queue -// dnodeFreeWriteWorker(pVnode->wworker); -// pVnode->wworker = NULL; -// -// // remove wal -// -// // remove tsdb -// if (pVnode->tsdb) { -// tsdbCloseRepo(pVnode->tsdb); -// pVnode->tsdb = NULL; -// } -// -// taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + pVnode->status = TSDB_VN_STATUS_NOT_READY; + int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); + if (count > 0) { + // wait refcount + } + + // remove replica + + // remove read queue + dnodeFreeReadWorker(pVnode->rworker); + pVnode->rworker = NULL; + + // remove write queue + dnodeFreeWriteWorker(pVnode->wworker); + pVnode->wworker = NULL; + + // remove wal + + // remove tsdb + if (pVnode->tsdb) { + tsdbCloseRepo(pVnode->tsdb); + pVnode->tsdb = NULL; + } + + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); } static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { -// STsdbCfg tsdbCfg; -// tsdbCfg.precision = pVnodeCfg->cfg.precision; -// tsdbCfg.tsdbId = pVnodeCfg->vnode; -// tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; -// tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; -// tsdbCfg.minRowsPerFileBlock = -1; -// tsdbCfg.maxRowsPerFileBlock = -1; -// tsdbCfg.keep = -1; -// tsdbCfg.maxCacheSize = -1; - -// char rootDir[TSDB_FILENAME_LEN] = {0}; -// sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId); -// -// void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); -// if (pTsdb != NULL) { -// return terrno; -// } -// -// SVnodeObj vnodeObj; -// vnodeObj.vgId = pVnodeCfg->cfg.vgId; -// vnodeObj.status = TSDB_VN_STATUS_NOT_READY; -// vnodeObj.refCount = 1; -// vnodeObj.version = 0; -// vnodeObj.wworker = dnodeAllocateWriteWorker(); -// vnodeObj.rworker = dnodeAllocateReadWorker(); -// vnodeObj.wal = NULL; -// vnodeObj.tsdb = pTsdb; -// vnodeObj.replica = NULL; -// vnodeObj.events = NULL; -// vnodeObj.cq = NULL; -// -// taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj); + STsdbCfg tsdbCfg; + tsdbCfg.precision = pVnodeCfg->cfg.precision; + tsdbCfg.tsdbId = pVnodeCfg->vnode; + tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; + tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; + tsdbCfg.minRowsPerFileBlock = -1; + tsdbCfg.maxRowsPerFileBlock = -1; + tsdbCfg.keep = -1; + tsdbCfg.maxCacheSize = -1; + + char rootDir[TSDB_FILENAME_LEN] = {0}; + sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId); + + void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); + if (pTsdb != NULL) { + return terrno; + } + + SVnodeObj vnodeObj; + vnodeObj.vgId = pVnodeCfg->cfg.vgId; + vnodeObj.status = TSDB_VN_STATUS_NOT_READY; + vnodeObj.refCount = 1; + vnodeObj.version = 0; + vnodeObj.wworker = dnodeAllocateWriteWorker(); + vnodeObj.rworker = dnodeAllocateReadWorker(); + vnodeObj.wal = NULL; + vnodeObj.tsdb = pTsdb; + vnodeObj.replica = NULL; + vnodeObj.events = NULL; + vnodeObj.cq = NULL; + + taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); return TSDB_CODE_SUCCESS; } static void dnodeDropVnode(SVnodeObj *pVnode) { -// pVnode->status = TSDB_VN_STATUS_NOT_READY; -// -// int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); -// if (count > 0) { -// // wait refcount -// } -// -// if (pVnode->tsdb) { -// tsdbDropRepo(pVnode->tsdb); -// pVnode->tsdb = NULL; -// } -// -// dnodeCleanupVnode(pVnode); + pVnode->status = TSDB_VN_STATUS_NOT_READY; + + int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); + if (count > 0) { + // wait refcount + } + + if (pVnode->tsdb) { + tsdbDropRepo(pVnode->tsdb); + pVnode->tsdb = NULL; + } + + dnodeCleanupVnode(pVnode); } -static void dnodeProcesSMDCreateVnodeMsg(SRpcMsg *rpcMsg) { +static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SMDCreateVnodeMsg *pCreate = (SMDCreateVnodeMsg *) rpcMsg->pCont; + SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->vnode = htonl(pCreate->vnode); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); + SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); if (pVnodeObj != NULL) { rpcRsp.code = TSDB_CODE_SUCCESS; } else { @@ -267,13 +270,13 @@ static void dnodeProcesSMDCreateVnodeMsg(SRpcMsg *rpcMsg) { rpcFreeCont(rpcMsg->pCont); } -static void dnodeProcesSMDDropVnodeMsg(SRpcMsg *rpcMsg) { +static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SMDDropVnodeMsg *pDrop = (SMDCreateVnodeMsg *) rpcMsg->pCont; + SMDDropVnodeMsg *pDrop = rpcMsg->pCont; pDrop->vgId = htonl(pDrop->vgId); - SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId); + SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId); if (pVnodeObj != NULL) { dnodeDropVnode(pVnodeObj); rpcRsp.code = TSDB_CODE_SUCCESS; @@ -288,13 +291,13 @@ static void dnodeProcesSMDDropVnodeMsg(SRpcMsg *rpcMsg) { static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SMDCreateVnodeMsg *pCreate = (SMDCreateVnodeMsg *) rpcMsg->pCont; + SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->vnode = htonl(pCreate->vnode); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); + SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); if (pVnodeObj != NULL) { rpcRsp.code = TSDB_CODE_SUCCESS; } else { @@ -304,3 +307,11 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { rpcSendResponse(&rpcRsp); rpcFreeCont(rpcMsg->pCont); } + +static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { + +} + +static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { + +} \ No newline at end of file diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeMnode.c index 2aa07f7c89dfb79f3ba4c562be9b89bf37c303ff..6e75ddc68ef197e5a12dda4b964d3e7cfb6e0ec6 100644 --- a/src/dnode/src/dnodeMnode.c +++ b/src/dnode/src/dnodeMnode.c @@ -27,18 +27,22 @@ static void *tsDnodeMnodeRpc = NULL; int32_t dnodeInitMnode() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeWrite; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeWrite; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeMgmt; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeMgmt; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeMgmt; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeMgmt; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; - - // note: a new port shall be assigned - // rpcInit.localPort = tsDnodeMnodePort; + rpcInit.localPort = tsDnodeMnodePort; rpcInit.label = "DND-mgmt"; rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessMsgFromMnode; + rpcInit.cfp = dnodeProcessMsgFromMnode; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1500; diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index d570b0b6a773add96fb6fd8568755258375e5928..dd050d35a744b3b04cbc02ce315b11e09a3b94a1 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -30,8 +30,8 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg); static void *tsDnodeShellRpc = NULL; int32_t dnodeInitShell() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead; int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; @@ -43,10 +43,10 @@ int32_t dnodeInitShell() { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; - rpcInit.localPort = tsVnodeShellPort; + rpcInit.localPort = tsDnodeShellPort; rpcInit.label = "DND-shell"; rpcInit.numOfThreads = numOfThreads; - rpcInit.cfp = dnodeProcessMsgFromShell; + rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1500; diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 70d09564eb6d8dfb5021a621796cbdce1e55b8f0..a62ad5d0d22d54394b5e58b94a51255f9f832e39 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -56,13 +56,17 @@ static void dnodeProcessWriteResult(SWriteMsg *pWrite); static void dnodeProcessSubmitMsg(SWriteMsg *pMsg); static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg); static void dnodeProcessDropTableMsg(SWriteMsg *pMsg); +static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg); +static void dnodeProcessDropStableMsg(SWriteMsg *pMsg); SWriteWorkerPool wWorkerPool; int32_t dnodeInitWrite() { - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeProcessCreateTableMsg; - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg; + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg; + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeProcessAlterTableMsg; + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeProcessDropStableMsg; wWorkerPool.max = tsNumOfCores; wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); @@ -253,3 +257,11 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { } + +static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) { + +} + +static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { + +} \ No newline at end of file diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a2efd917df8575fe1afa02709ff6f1271a468b89..7a75c67268be1ab7369924dc7382f95162cb9b0a 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -38,7 +38,7 @@ extern "C" { #define TSDB_MSG_TYPE_RETRIEVE 7 #define TSDB_MSG_TYPE_RETRIEVE_RSP 8 -// message from mgmt to dnode +// message from mnode to dnode #define TSDB_MSG_TYPE_MD_CREATE_TABLE 9 #define TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP 10 #define TSDB_MSG_TYPE_MD_DROP_TABLE 11 @@ -58,84 +58,76 @@ extern "C" { #define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25 #define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26 - +// message from client to mnode +#define TSDB_MSG_TYPE_CM_CONNECT 31 +#define TSDB_MSG_TYPE_CM_CONNECT_RSP 32 +#define TSDB_MSG_TYPE_CM_CREATE_ACCT 33 +#define TSDB_MSG_TYPE_CM_CREATE_ACCT_RSP 34 +#define TSDB_MSG_TYPE_CM_ALTER_ACCT 35 +#define TSDB_MSG_TYPE_CM_ALTER_ACCT_RSP 36 +#define TSDB_MSG_TYPE_CM_DROP_ACCT 37 +#define TSDB_MSG_TYPE_CM_DROP_ACCT_RSP 38 +#define TSDB_MSG_TYPE_CM_CREATE_USER 39 +#define TSDB_MSG_TYPE_CM_CREATE_USER_RSP 40 +#define TSDB_MSG_TYPE_CM_ALTER_USER 41 +#define TSDB_MSG_TYPE_CM_ALTER_USER_RSP 42 +#define TSDB_MSG_TYPE_CM_DROP_USER 43 +#define TSDB_MSG_TYPE_CM_DROP_USER_RSP 44 +#define TSDB_MSG_TYPE_CM_CREATE_DNODE 45 +#define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46 +#define TSDB_MSG_TYPE_CM_DROP_DNODE 47 +#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48 #define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE #define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP - -#define TSDB_MSG_TYPE_DM_CONFIG_VNODE 19 -#define TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP 20 - - -#define TSDB_MSG_TYPE_SDB_SYNC 21 -#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22 -#define TSDB_MSG_TYPE_SDB_FORWARD 23 -#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24 -#define TSDB_MSG_TYPE_CONNECT 31 -#define TSDB_MSG_TYPE_CONNECT_RSP 32 -#define TSDB_MSG_TYPE_CREATE_ACCT 33 -#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34 -#define TSDB_MSG_TYPE_ALTER_ACCT 35 -#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 36 -#define TSDB_MSG_TYPE_DROP_ACCT 37 -#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38 -#define TSDB_MSG_TYPE_CREATE_USER 39 -#define TSDB_MSG_TYPE_CREATE_USER_RSP 40 -#define TSDB_MSG_TYPE_ALTER_USER 41 -#define TSDB_MSG_TYPE_ALTER_USER_RSP 42 -#define TSDB_MSG_TYPE_DROP_USER 43 -#define TSDB_MSG_TYPE_DROP_USER_RSP 44 -#define TSDB_MSG_TYPE_CREATE_MNODE 45 -#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 46 -#define TSDB_MSG_TYPE_DROP_MNODE 47 -#define TSDB_MSG_TYPE_DROP_MNODE_RSP 48 -#define TSDB_MSG_TYPE_CREATE_DNODE 49 -#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 50 -#define TSDB_MSG_TYPE_DROP_DNODE 51 -#define TSDB_MSG_TYPE_DROP_DNODE_RSP 52 -#define TSDB_MSG_TYPE_ALTER_DNODE 53 -#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 54 -#define TSDB_MSG_TYPE_CREATE_DB 55 -#define TSDB_MSG_TYPE_CREATE_DB_RSP 56 -#define TSDB_MSG_TYPE_DROP_DB 57 -#define TSDB_MSG_TYPE_DROP_DB_RSP 58 -#define TSDB_MSG_TYPE_USE_DB 59 -#define TSDB_MSG_TYPE_USE_DB_RSP 60 -#define TSDB_MSG_TYPE_ALTER_DB 61 -#define TSDB_MSG_TYPE_ALTER_DB_RSP 62 -#define TSDB_MSG_TYPE_CREATE_TABLE 63 -#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 64 -#define TSDB_MSG_TYPE_DROP_TABLE 65 -#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66 -#define TSDB_MSG_TYPE_ALTER_TABLE 67 -#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68 - -#define TSDB_MSG_TYPE_TABLE_CFG 71 -#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72 -#define TSDB_MSG_TYPE_TABLE_META 73 -#define TSDB_MSG_TYPE_TABLE_META_RSP 74 -#define TSDB_MSG_TYPE_STABLE_META 75 -#define TSDB_MSG_TYPE_STABLE_META_RSP 76 -#define TSDB_MSG_TYPE_MULTI_TABLE_META 77 -#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 78 -#define TSDB_MSG_TYPE_ALTER_STREAM 79 -#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80 -#define TSDB_MSG_TYPE_SHOW 81 -#define TSDB_MSG_TYPE_SHOW_RSP 82 -#define TSDB_MSG_TYPE_CFG_MNODE 83 -#define TSDB_MSG_TYPE_CFG_MNODE_RSP 84 -#define TSDB_MSG_TYPE_KILL_QUERY 85 -#define TSDB_MSG_TYPE_KILL_QUERY_RSP 86 -#define TSDB_MSG_TYPE_KILL_STREAM 87 -#define TSDB_MSG_TYPE_KILL_STREAM_RSP 88 -#define TSDB_MSG_TYPE_KILL_CONNECTION 89 -#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 90 -#define TSDB_MSG_TYPE_HEARTBEAT 91 -#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92 -#define TSDB_MSG_TYPE_STATUS 93 -#define TSDB_MSG_TYPE_STATUS_RSP 94 -#define TSDB_MSG_TYPE_GRANT 95 -#define TSDB_MSG_TYPE_GRANT_RSP 96 -#define TSDB_MSG_TYPE_MAX 97 +#define TSDB_MSG_TYPE_CM_CREATE_DB 49 +#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50 +#define TSDB_MSG_TYPE_CM_DROP_DB 51 +#define TSDB_MSG_TYPE_CM_DROP_DB_RSP 52 +#define TSDB_MSG_TYPE_CM_USE_DB 53 +#define TSDB_MSG_TYPE_CM_USE_DB_RSP 54 +#define TSDB_MSG_TYPE_CM_ALTER_DB 55 +#define TSDB_MSG_TYPE_CM_ALTER_DB_RSP 56 +#define TSDB_MSG_TYPE_CM_CREATE_TABLE 57 +#define TSDB_MSG_TYPE_CM_CREATE_TABLE_RSP 58 +#define TSDB_MSG_TYPE_CM_DROP_TABLE 59 +#define TSDB_MSG_TYPE_CM_DROP_TABLE_RSP 60 +#define TSDB_MSG_TYPE_CM_ALTER_TABLE 61 +#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62 +#define TSDB_MSG_TYPE_CM_TABLE_META 63 +#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64 +#define TSDB_MSG_TYPE_CM_STABLE_META 65 +#define TSDB_MSG_TYPE_CM_STABLE_META_RSP 66 +#define TSDB_MSG_TYPE_CM_TABLES_META 67 +#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68 +#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69 +#define TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP 70 +#define TSDB_MSG_TYPE_CM_SHOW 71 +#define TSDB_MSG_TYPE_CM_SHOW_RSP 72 +#define TSDB_MSG_TYPE_CM_KILL_QUERY 73 +#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74 +#define TSDB_MSG_TYPE_CM_KILL_STREAM 75 +#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 +#define TSDB_MSG_TYPE_CM_KILL_CONN 77 +#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 +#define TSDB_MSG_TYPE_CM_HEARTBEAT 79 +#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 80 + +// message from dnode to mnode +#define TSDB_MSG_TYPE_DM_CONFIG_TABLE 91 +#define TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP 92 +#define TSDB_MSG_TYPE_DM_CONFIG_VNODE 93 +#define TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP 94 +#define TSDB_MSG_TYPE_DM_STATUS 95 +#define TSDB_MSG_TYPE_DM_STATUS_RSP 96 +#define TSDB_MSG_TYPE_DM_GRANT 97 +#define TSDB_MSG_TYPE_DM_GRANT_RSP 98 + +#define TSDB_MSG_TYPE_SDB_SYNC 101 +#define TSDB_MSG_TYPE_SDB_SYNC_RSP 102 +#define TSDB_MSG_TYPE_SDB_FORWARD 103 +#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 104 + +#define TSDB_MSG_TYPE_MAX 105 // IE type #define TSDB_IE_TYPE_SEC 1 diff --git a/src/kit/shell/src/shellDarwin.c b/src/kit/shell/src/shellDarwin.c index b624f5ee68535026580af25aa962a8f6a79f963e..13513426cdea898a550ae6ed141246e290e9e555 100644 --- a/src/kit/shell/src/shellDarwin.c +++ b/src/kit/shell/src/shellDarwin.c @@ -81,7 +81,7 @@ void shellParseArgument(int argc, char *argv[], struct arguments *arguments) { // for management port else if (strcmp(argv[i], "-P") == 0) { if (i < argc - 1) { - tsMgmtShellPort = atoi(argv[++i]); + tsMnodeShellPort = atoi(argv[++i]); } else { fprintf(stderr, "option -P requires an argument\n"); exit(EXIT_FAILURE); diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 61fdb424e54435ce7dfa04b71219093d97e89bc7..0325e7f641ecea998927f909c02df724641890d3 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -68,7 +68,7 @@ TAOS *shellInit(struct arguments *args) { tsMeterMetaKeepTimer = 3000; // Connect to the database. - TAOS *con = taos_connect(args->host, args->user, args->password, args->database, tsMgmtShellPort); + TAOS *con = taos_connect(args->host, args->user, args->password, args->database, tsMnodeShellPort); if (con == NULL) { return con; } diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index dd04f935e7a30f6a8775b831c3ec726855f520f4..143a27a3710ae5cda3dee1bc987649612102f016 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -227,7 +227,7 @@ static void shellRunImportThreads(struct arguments* args) ShellThreadObj *pThread = threadObj + t; pThread->threadIndex = t; pThread->totalThreads = args->threadNum; - pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMgmtShellPort); + pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMnodeShellPort); if (pThread->taos == NULL) { fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos)); exit(0); diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 081b9eae319f3570ab11b67e075292648dd76161..cdf59a7293d8915153b6f148c86a52321b5384d9 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -62,7 +62,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { if (arg) arguments->password = arg; break; case 'P': - tsMgmtShellPort = atoi(arg); + tsMnodeShellPort = atoi(arg); break; case 't': arguments->timezone = arg; diff --git a/src/kit/shell/src/shellWindows.c b/src/kit/shell/src/shellWindows.c index 61e6bcaf30b887a445134b28b0af022127b3e8ab..8863f4fa46c9b3baa6d9d390c6985ced1ce00758 100644 --- a/src/kit/shell/src/shellWindows.c +++ b/src/kit/shell/src/shellWindows.c @@ -61,7 +61,7 @@ void shellParseArgument(int argc, char *argv[], struct arguments *arguments) { // for management port else if (strcmp(argv[i], "-P") == 0) { if (i < argc - 1) { - tsMgmtShellPort = atoi(argv[++i]); + tsMnodeShellPort = atoi(argv[++i]); } else { fprintf(stderr, "option -P requires an argument\n"); exit(EXIT_FAILURE); diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index 27cbf230d55e5b5c7d8f4bdb0795a63e84f08278..3c5d365f72e8ac4ff7c6d572ae76d47bb47838a7 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -41,7 +41,7 @@ static void *tsMgmtDServerRpc; int32_t mgmtInitDServer() { SRpcInit rpcInit = {0}; rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;; - rpcInit.localPort = tsMgmtDnodePort; + rpcInit.localPort = tsMnodeDnodePort; rpcInit.label = "MND-DS"; rpcInit.numOfThreads = 1; rpcInit.cfp = mgmtProcessMsgFromDnode; @@ -234,7 +234,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s // // mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn); // -// if (msgType == TSDB_MSG_TYPE_TABLE_CFG) { +// if (msgType == TSDB_MSG_TYPE_DM_CONFIG_TABLE) { // mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn); // } else if (msgType == TSDB_MSG_TYPE_DM_CONFIG_VNODE) { // mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn); @@ -249,8 +249,8 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s // } else if (msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { // mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code); // } else if (msgType == TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP) { -// } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { -// } else if (msgType == TSDB_MSG_TYPE_STATUS) { +// } else if (msgType == TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP) { +// } else if (msgType == TSDB_MSG_TYPE_DM_STATUS) { // mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code); // } else { // mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 750aa9a1c11b9626a41f8ae7213c4852a0644f45..2cbbfedbb079efefc54295f33fb6b2c6ae2271d9 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -108,9 +108,9 @@ int32_t mgmtInitDbs() { } } - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CREATE_DB, mgmtProcessCreateDbMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_ALTER_DB, mgmtProcessAlterDbMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_DROP_DB, mgmtProcessDropDbMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mgmtProcessCreateDbMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mgmtProcessAlterDbMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mgmtProcessDropDbMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DB, mgmtGetDbMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mgmtRetrieveDbs); diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index eafa860596d9c5b3398237ebce82f8bf0c6464e5..2061fc3409c64ae2c9c881951c471797ce2be9c2 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -752,9 +752,9 @@ int32_t mgmtInitProfile() { mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mgmtRetrieveConns); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mgmtGetStreamMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mgmtRetrieveStreams); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_KILL_QUERY, mgmtProcessKillQueryMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_KILL_STREAM, mgmtProcessKillStreamMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_KILL_CONNECTION, mgmtProcessKillConnectionMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mgmtProcessKillQueryMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mgmtProcessKillStreamMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mgmtProcessKillConnectionMsg); return 0; } diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 31c620f5c09f8c9aa6cf51e5db6f4a2425a37783..380508874692b408c5d3c91655dff6201691d166 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -56,7 +56,7 @@ static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; int32_t mgmtInitShell() { - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_SHOW, mgmtProcessShowMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; @@ -66,7 +66,7 @@ int32_t mgmtInitShell() { SRpcInit rpcInit = {0}; rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; - rpcInit.localPort = tsMgmtShellPort; + rpcInit.localPort = tsMnodeShellPort; rpcInit.label = "MND-shell"; rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = mgmtProcessMsgFromShell; @@ -81,8 +81,8 @@ int32_t mgmtInitShell() { return -1; } - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_HEARTBEAT, mgmtProcessHeartBeatMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CONNECT, mgmtProcessConnectMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); mPrint("server connection to shell is opened"); return 0; @@ -281,7 +281,7 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { rpcGetConnInfo(rpcMsg->handle, &connInfo); pHBRsp->ipList.inUse = 0; - pHBRsp->ipList.port = htons(tsMgmtShellPort); + pHBRsp->ipList.port = htons(tsMnodeShellPort); pHBRsp->ipList.numOfIps = 0; if (pSdbPublicIpList != NULL && pSdbIpList != NULL) { pHBRsp->ipList.numOfIps = htons(pSdbPublicIpList->numOfIps); @@ -375,7 +375,7 @@ static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->superAuth = pUser->superAuth; pConnectRsp->ipList.inUse = 0; - pConnectRsp->ipList.port = htons(tsMgmtShellPort); + pConnectRsp->ipList.port = htons(tsMnodeShellPort); pConnectRsp->ipList.numOfIps = 0; if (pSdbPublicIpList != NULL && pSdbIpList != NULL) { pConnectRsp->ipList.numOfIps = htons(pSdbPublicIpList->numOfIps); @@ -420,10 +420,10 @@ static bool mgmtCheckMeterMetaMsgType(void *pMsg) { } static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { - if ((type == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) || - type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_RETRIEVE || - type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_MULTI_TABLE_META || - type == TSDB_MSG_TYPE_CONNECT) { + if ((type == TSDB_MSG_TYPE_CM_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) || + type == TSDB_MSG_TYPE_CM_STABLE_META || type == TSDB_MSG_TYPE_RETRIEVE || + type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_CM_TABLES_META || + type == TSDB_MSG_TYPE_CM_CONNECT) { return true; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index c4bbd9b3f2c415b5f87824333bc92e042125669a..48f38f799136c3d07922b12722902c0515082db9 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -71,12 +71,12 @@ int32_t mgmtInitTables() { mgmtSetVgroupIdPool(); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CREATE_TABLE, mgmtProcessCreateTableMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_DROP_TABLE, mgmtProcessDropTableMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_ALTER_TABLE, mgmtProcessAlterTableMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_TABLE_META, mgmtProcessTableMetaMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_MULTI_TABLE_META, mgmtProcessMultiTableMetaMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_STABLE_META, mgmtProcessSuperTableMetaMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TABLE, mgmtProcessCreateTableMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_TABLE, mgmtProcessDropTableMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TABLE, mgmtProcessAlterTableMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLES_META, mgmtProcessMultiTableMetaMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_META, mgmtProcessSuperTableMetaMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index e3ea2292d81a59e1f62ccb1a27d5128132044a73..2d898fa34c6ee9dff443e5e23cc1ba41de0c0672 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -83,9 +83,9 @@ int32_t mgmtInitUsers() { mgmtCreateUser(pAcct, "monitor", tsInternalPass); mgmtCreateUser(pAcct, "_root", tsInternalPass); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CREATE_USER, mgmtProcessCreateUserMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_ALTER_USER, mgmtProcessAlterUserMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_DROP_USER, mgmtProcessDropUserMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_USER, mgmtProcessCreateUserMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_USER, mgmtProcessAlterUserMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_USER, mgmtProcessDropUserMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index f8331ec968bf1cf23386d3024040097838072a48..2b158f3f35a1967fb4a233457a21c9e96b10b5c6 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -558,7 +558,7 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) { } SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { - SRpcIpSet ipSet = {.numOfIps = pVgroup->numOfVnodes, .inUse = 0, .port = tsMgmtDnodePort + 1}; + SRpcIpSet ipSet = {.numOfIps = pVgroup->numOfVnodes, .inUse = 0, .port = tsMnodeDnodePort + 1}; for (int i = 0; i < pVgroup->numOfVnodes; ++i) { ipSet.ip[i] = pVgroup->vnodeGid[i].ip; } @@ -566,7 +566,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { } SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { - SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1}; + SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMnodeDnodePort + 1}; return ipSet; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 505fba94ce51d2a398f3fb5369862f29dc5ad5e5..291cb5effcdd2978c1d7c0f331d2363d72dceae9 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -360,8 +360,8 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) { // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || - type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META || - type == TSDB_MSG_TYPE_SHOW ) + type == TSDB_MSG_TYPE_CM_STABLE_META || type == TSDB_MSG_TYPE_CM_TABLES_META || + type == TSDB_MSG_TYPE_CM_SHOW ) pContext->connType = RPC_CONN_TCPC; rpcSendReqToServer(pRpc, pContext); @@ -814,7 +814,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { terrno = 0; pConn = rpcProcessMsgHead(pRpc, pRecv); - if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { + if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); @@ -983,12 +983,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { msgLen = rpcAddAuthPart(pConn, msg, msgLen); if ( rpcIsReq(pHead->msgType)) { - if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) + if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { - if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) + if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); diff --git a/src/util/inc/tglobalcfg.h b/src/util/inc/tglobalcfg.h index bbb824cd3d9100f44d9abf7d5085a63e72b04eff..18523c36799390ecaecb24601c9cb2a32419f83a 100644 --- a/src/util/inc/tglobalcfg.h +++ b/src/util/inc/tglobalcfg.h @@ -58,9 +58,10 @@ extern char osName[]; extern char tsMasterIp[]; extern char tsSecondIp[]; -extern uint16_t tsMgmtDnodePort; -extern uint16_t tsMgmtShellPort; -extern uint16_t tsVnodeShellPort; +extern uint16_t tsMnodeDnodePort; +extern uint16_t tsMnodeShellPort; +extern uint16_t tsDnodeShellPort; +extern uint16_t tsDnodeMnodePort; extern uint16_t tsVnodeVnodePort; extern uint16_t tsMgmtMgmtPort; extern uint16_t tsMgmtSyncPort; diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index a49873d30a83c02c2faa4bebbca36190d31a389f..5ba878c5983fe3813bd533c8723eeddfad7c247c 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -61,9 +61,10 @@ int64_t tsMsPerDay[] = {86400000L, 86400000000L}; char tsMasterIp[TSDB_IPv4ADDR_LEN] = {0}; char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0}; -uint16_t tsMgmtShellPort = 6030; // udp[6030-6034] tcp[6030] -uint16_t tsVnodeShellPort = 6035; // udp[6035-6039] tcp[6035] -uint16_t tsMgmtDnodePort = 6040; // udp[6040-6044] tcp[6040] +uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030] +uint16_t tsDnodeShellPort = 6035; // udp[6035-6039] tcp[6035] +uint16_t tsMnodeDnodePort = 6040; // udp/tcp +uint16_t tsDnodeMnodePort = 6041; // udp/tcp uint16_t tsVnodeVnodePort = 6045; // tcp[6045] uint16_t tsMgmtMgmtPort = 6050; // udp, numOfVnodes fixed to 1, range udp[6050] uint16_t tsMgmtSyncPort = 6050; // tcp, range tcp[6050] @@ -492,13 +493,13 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "httpPort", &tsHttpPort, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 1, 65535, 0, TSDB_CFG_UTYPE_NONE); - tsInitConfigOption(cfg++, "mgmtShellPort", &tsMgmtShellPort, TSDB_CFG_VTYPE_SHORT, + tsInitConfigOption(cfg++, "mgmtShellPort", &tsMnodeShellPort, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT, 1, 65535, 0, TSDB_CFG_UTYPE_NONE); - tsInitConfigOption(cfg++, "vnodeShellPort", &tsVnodeShellPort, TSDB_CFG_VTYPE_SHORT, + tsInitConfigOption(cfg++, "vnodeShellPort", &tsDnodeShellPort, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT, 1, 65535, 0, TSDB_CFG_UTYPE_NONE); - tsInitConfigOption(cfg++, "mgmtVnodePort", &tsMgmtDnodePort, TSDB_CFG_VTYPE_SHORT, + tsInitConfigOption(cfg++, "mgmtVnodePort", &tsMnodeDnodePort, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER, 1, 65535, 0, TSDB_CFG_UTYPE_NONE); tsInitConfigOption(cfg++, "vnodeVnodePort", &tsVnodeVnodePort, TSDB_CFG_VTYPE_SHORT, diff --git a/src/util/src/tstring.c b/src/util/src/tstring.c index a5ab7fbf67d9124e0fa2fa85b60a2e2f70a22eb5..17b00c5fe91218c209867cdffb1a33065f295990 100644 --- a/src/util/src/tstring.c +++ b/src/util/src/tstring.c @@ -26,23 +26,23 @@ char *taosMsg[] = { "create-table", "create-table-rsp", //10 - "remove-table", - "remove-table-rsp", + "drop-table", + "drop-table-rsp", + "alter-table", + "alter-table-rsp", "create-vnode", "create-vnode-rsp", - "free-vnode", - "free-vnode-rsp", - "cfg-dnode", - "cfg-dnode-rsp", - "alter-stream", - "alter-stream-rsp", //20 + "drop-vnode", + "drop-vnode-rsp", + "alter-vnode", + "alter-vnode-rsp", //20 - "sync", - "sync-rsp", - "forward", - "forward-rsp", "drop-stable", "drop-stable-rsp", + "alter-stream", + "alter-stream-rsp", + "config-dnode", + "config-dnode-rsp", "", "", "", @@ -63,37 +63,26 @@ char *taosMsg[] = { "alter-user-rsp", "drop-user", "drop-user-rsp", - "create-mnode", - "create-mnode-rsp", - "drop-mnode", - "drop-mnode-rsp", "create-dnode", - "create-dnode-rsp", //50 - + "create-dnode-rsp", "drop-dnode", "drop-dnode-rsp", - "alter-dnode", - "alter-dnode-rsp", "create-db", - "create-db-rsp", + "create-db-rsp", //50 + "drop-db", "drop-db-rsp", "use-db", - "use-db-rsp", //60 - + "use-db-rsp", "alter-db", "alter-db-rsp", "create-table", "create-table-rsp", "drop-table", - "drop-table-rsp", + "drop-table-rsp", //60 + "alter-table", "alter-table-rsp", - "cfg-vnode", - "cfg-vnode-rsp", //70 - - "cfg-table", - "cfg-table-rsp", "table-meta", "table-meta-rsp", "super-table-meta", @@ -101,24 +90,42 @@ char *taosMsg[] = { "multi-table-meta", "multi-table-meta-rsp", "alter-stream", - "alter-stream-rsp", //80 + "alter-stream-rsp", //70 "show", "show-rsp", - "cfg-mnode", - "cfg-mnode-rsp", "kill-query", "kill-query-rsp", "kill-stream", "kill-stream-rsp", "kill-connection", - "kill-connectoin-rsp", //90 - + "kill-connectoin-rsp", "heart-beat", - "heart-beat-rsp", + "heart-beat-rsp", //80 + + "", + "", + "", + "", + "", + "", + "", + "", //90 + + "config-table", + "config-table-rsp", + "config-vnode", + "config-vnode-rsp", "status", "status-rsp", "grant", "grant-rsp", + "", + "", //100 + + "sdb-sync", + "sdb-sync-rsp", + "sdb-forward", + "sdb-forward-rsp", "max" }; \ No newline at end of file