From 10f78bb5fbaa1b13942936b0fa17436c2d22dd85 Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 18 Feb 2020 23:23:58 +0800 Subject: [PATCH] dnodeMgmt.c --- src/client/src/tscSql.c | 6 +- .../java/com/taosdata/jdbc/TSDBError.java | 2 +- src/dnode/inc/dnodeMgmt.h | 4 +- src/dnode/inc/dnodeUtil.h | 39 -- src/dnode/inc/dnodeVnodeMgmt.h | 27 +- src/dnode/inc/dnodeWrite.h | 37 +- src/dnode/src/dnodeMgmt.c | 551 +++--------------- src/dnode/src/dnodeShell.c | 2 +- src/dnode/src/dnodeSystem.c | 4 +- src/dnode/src/dnodeUtil.c | 25 - src/dnode/src/dnodeVnodeMgmt.c | 16 + src/dnode/src/dnodeWrite.c | 46 +- src/inc/dnode.h | 1 - src/inc/mnode.h | 1 + src/inc/taoserror.h | 1 + src/inc/taosmsg.h | 66 +-- src/mnode/src/mgmtDnodeInt.c | 22 +- src/mnode/src/mgmtVgroup.c | 4 +- 18 files changed, 187 insertions(+), 667 deletions(-) delete mode 100644 src/dnode/inc/dnodeUtil.h delete mode 100644 src/dnode/src/dnodeUtil.c diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 85b9462660..3b30c9ccb6 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -1034,7 +1034,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t pCmd->command = TSDB_SQL_MULTI_META; pCmd->count = 0; - int code = TSDB_CODE_INVALID_METER_ID; + int code = TSDB_CODE_INVALID_TABLE_ID; char *str = (char *)tblNameList; SQueryInfo *pQueryInfo = NULL; @@ -1070,7 +1070,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t // Check if the table name available or not if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_INVALID_METER_ID; + code = TSDB_CODE_INVALID_TABLE_ID; sprintf(pCmd->payload, "table name is invalid"); return code; } @@ -1080,7 +1080,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t } if (++pCmd->count > TSDB_MULTI_METERMETA_MAX_NUM) { - code = TSDB_CODE_INVALID_METER_ID; + code = TSDB_CODE_INVALID_TABLE_ID; sprintf(pCmd->payload, "tables over the max number"); return code; } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBError.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBError.java index 7d5b02606e..bb1b2afd07 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBError.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBError.java @@ -48,7 +48,7 @@ public enum TSDBError { TSDB_CODE_INVALID_VALUE(24, "invalid value"), TSDB_CODE_REDIRECT(25, "service not available"), TSDB_CODE_ALREADY_THERE(26, "already there"), - TSDB_CODE_INVALID_METER_ID(27, "invalid meter ID"), + TSDB_CODE_INVALID_TABLE_ID(27, "invalid meter ID"), TSDB_CODE_INVALID_SQL(28, "invalid SQL"), // this message often comes with additional info which will vary based on the specific error situation TSDB_CODE_NETWORK_UNAVAIL(29, "failed to connect to server"), TSDB_CODE_INVALID_MSG_LEN(30, "invalid msg len"), diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 30bc8a2ad3..4ee0d57cfa 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -25,12 +25,10 @@ extern "C" { #include "tsched.h" #include "dnode.h" -int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); -int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); -extern void *dmQhandle; +extern void *tsDnodeMgmtQhandle; void dnodeSendVpeerCfgMsg(int32_t vnode); void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid); diff --git a/src/dnode/inc/dnodeUtil.h b/src/dnode/inc/dnodeUtil.h deleted file mode 100644 index dfb34b4b74..0000000000 --- a/src/dnode/inc/dnodeUtil.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_DNODE_UTIL_H -#define TDENGINE_DNODE_UTIL_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include "taosdef.h" -#include "taosmsg.h" -#include "tstatus.h" - -EVnodeStatus dnodeGetVnodeStatus(int32_t vnode); - -bool dnodeCheckVnodeExist(int32_t vnode); - -void *dnodeGetVnodeObj(int32_t vnode); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/dnode/inc/dnodeVnodeMgmt.h b/src/dnode/inc/dnodeVnodeMgmt.h index 9fee09166b..7078dd78fb 100644 --- a/src/dnode/inc/dnodeVnodeMgmt.h +++ b/src/dnode/inc/dnodeVnodeMgmt.h @@ -21,9 +21,10 @@ extern "C" { #endif #include - +#include #include "taosdef.h" #include "taosmsg.h" +#include "tstatus.h" /* * Open all Vnodes in the local data directory @@ -38,34 +39,34 @@ int32_t dnodeCleanupVnodes(); /* * Check if vnode already exists */ -int32_t dnodeCheckVnodeExist(int vid); +bool dnodeCheckVnodeExist(int32_t vid); /* * Create vnode with specified configuration and open it + * if exist, config it */ -//tsdb_repo_t* dnodeCreateVnode(int vid, SVnodeCfg *cfg); -void* dnodeCreateVnode(int vid, SVnodeCfg *cfg); +void* dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg); /* - * Modify vnode configuration information + * Remove vnode from local repository */ -int32_t dnodeConfigVnode(int vid, SVnodeCfg *cfg); +int32_t dnodeDropVnode(int32_t vnode); /* - * Modify vnode replication information + * Get the vnode object that has been opened */ -int32_t dnodeConfigVnodePeers(int vid/*, SVpeerCfgMsg *cfg*/); +//tsdb_repo_t* dnodeGetVnode(int vid); +void* dnodeGetVnode(int vid); /* - * Remove vnode from local repository + * get the status of vnode */ -int32_t dnodeDropVnode(int vid); +EVnodeStatus dnodeGetVnodeStatus(int32_t vnode); /* - * Get the vnode object that has been opened + * Check if vnode already exists, and table exist in this vnode */ -//tsdb_repo_t* dnodeGetVnode(int vid); -void* dnodeGetVnode(int vid); +bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 6f74dee879..78af597132 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -35,45 +35,26 @@ extern "C" { void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)); /* - * Create noraml table with specified configuration and open it + * Create table with specified configuration and open it + * if table already exist, update its schema and tag */ -int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table); +int32_t dnodeCreateTable(SDCreateTableMsg *table); /* - * Create stream table with specified configuration and open it - */ -int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table); - -/* - * Create child table with specified configuration and open it - */ -int32_t dnodeCreateChildTable(SCreateChildTableMsg *table); - -/* - * Modify normal table configuration information - * - */ -int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table); - -/* - * Modify stream table configuration information + * Remove table from local repository */ -int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table); +int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid); /* - * Modify child table configuration information + * Create stream + * if stream already exist, update it */ -int32_t dnodeAlterChildTable(SCreateChildTableMsg *table); +int32_t dnodeCreateStream(SAlterStreamMsg *stream); /* * Remove all child tables of supertable from local repository */ -int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid); - -/* - * Remove table from local repository - */ -int32_t dnodeDropTable(int vid, int sid, int64_t uid); +int32_t dnodeDropSuperTable(uint64_t stableUid); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 72728f90ad..ef4f26a889 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -16,84 +16,47 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "dnode.h" -#include "dnodeSystem.h" -#include "dnodeMgmt.h" - #include "taosmsg.h" #include "tlog.h" #include "trpc.h" #include "tsched.h" #include "tsystem.h" - -SMgmtObj mgmtObj; -extern uint64_t tsCreatedTime; - -int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj); -int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen); -int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); -int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); -int dnodeProcessTableCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); -int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj); -int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj); -void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables); -void vnodeOpenVnode(int vnode); -void vnodeCleanUpOneVnode(int vnode); +#include "mnode.h" +#include "dnode.h" +#include "dnodeSystem.h" +#include "dnodeMgmt.h" +#include "dnodeWrite.h" +#include "dnodeVnodeMgmt.h" static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn); static void dnodeInitProcessShellMsg(); -char *taosBuildRspMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) { - char *pStart = (char *)malloc(size); - if (pStart == NULL) { - return NULL; - } +void taosSendMsgToMnodeImpFp(SSchedMsg *sched) { + char msgType = *sched->msg; + char *content = sched->msg + sizeof(int32_t); - *pStart = type; - return pStart + 1; + mgmtProcessMsgFromDnode(content, 0, msgType, NULL); + rpcFreeCont(sched->msg); } -char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size) = taosBuildRspMsgToMnodeWithSizeImp; -char *taosBuildReqMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) { - char *pStart = (char *)malloc(size); - if (pStart == NULL) { - return NULL; - } - - *pStart = type; - return pStart + 1; -} -char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size) = taosBuildReqMsgToMnodeWithSizeImp; - -char *taosBuildRspMsgToMnodeImp(SMgmtObj *pObj, char type) { - return taosBuildRspMsgToMnodeWithSizeImp(pObj, type, 256); -} -char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type) = taosBuildRspMsgToMnodeImp; - -char *taosBuildReqMsgToMnodeImp(SMgmtObj *pObj, char type) { - return taosBuildReqMsgToMnodeWithSizeImp(pObj, type, 256); -} -char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type) = taosBuildReqMsgToMnodeImp; - -int taosSendMsgToMnodeImp(SMgmtObj *pObj, char *msg, int msgLen) { - dTrace("msg:%s is sent to mnode", taosMsg[(uint8_t)(*(msg-1))]); +int32_t taosSendMsgToMnodeImp(int8_t *msg, int32_t msgLen) { + dTrace("msg:%s is sent to mnode", taosMsg[(int32_t)(*(msg-sizeof(int32_t)))]); /* * Lite version has no message header, so minus one */ SSchedMsg schedMsg; - schedMsg.fp = mgmtProcessMsgFromDnodeSpec; - schedMsg.msg = msg - 1; + schedMsg.fp = taosSendMsgToMnodeImpFp; + schedMsg.msg = msg - sizeof(int32_t); schedMsg.ahandle = NULL; schedMsg.thandle = NULL; - taosScheduleTask(dmQhandle, &schedMsg); + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); return 0; } -int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen) = taosSendMsgToMnodeImp; +int32_t (*taosSendMsgToMnode)(char *msg, int32_t msgLen) = taosSendMsgToMnodeImp; -int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) { +int32_t taosSendSimpleRspToMnodeImp(int32_t rsptype, int32_t code) { char *pStart = taosBuildRspMsgToMnode(0, rsptype); if (pStart == NULL) { return 0; @@ -104,7 +67,7 @@ int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) { return 0; } -int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code) = taosSendSimpleRspToMnodeImp; +int (*taosSendSimpleRspToMnode)(int32_t rsptype, int32_t code) = taosSendSimpleRspToMnodeImp; int32_t dnodeInitMgmtImp() { dnodeInitProcessShellMsg(); @@ -121,7 +84,7 @@ void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) { int32_t msgType = *(int32_t*)(sched->msg); int8_t *content = sched->msg + sizeof(int32_t); - dTrace("msg:%s is received from mgmt", taosMsg[msgType]); + dTrace("msg:%s is received from mnode", taosMsg[msgType]); dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL); free(sched->msg); @@ -139,444 +102,118 @@ void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, } } -int dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { - int code = *pMsg; +int32_t dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { + int32_t code = htonl(*((int32_t *) pMsg)); - if (code == 0) { - vnodeProcessCreateMeterMsg(pMsg + 1, msgLen - 1); + if (code == TSDB_CODE_SUCCESS) { + SDCreateTableMsg *table = (SDCreateTableMsg *) (pMsg + sizeof(int32_t)); + return dnodeCreateTable(table); + } else if (code == TSDB_CODE_INVALID_TABLE_ID) { + SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pMsg + sizeof(int32_t)); + int32_t vnode = htonl(table->vnode); + int32_t sid = htonl(table->sid); + uint64_t uid = htobe64(table->uid); + dError("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); + return dnodeDropTable(vnode, sid, uid); } else { - STaosRsp *pRsp; - pRsp = (STaosRsp *)pMsg; - int32_t *pint = (int32_t *)pRsp->more; - int vnode = htonl(*pint); - int sid = htonl(*(pint + 1)); - dError("vid:%d, sid:%d, code:%d, meter is not configured, remove it", vnode, sid, code); - int ret = vnodeRemoveMeterObj(vnode, sid); - dTrace("vid:%d, sid:%d, meter delete ret:%d", vnode, sid, ret); + dError("code:%d invalid message", code); + return TSDB_CODE_INVALID_MSG; } - - return 0; } -int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { - SCreateMsg *pCreate; - int code = 0; - int vid; - SVnodeObj * pVnode; - - pCreate = (SCreateMsg *)pMsg; - vid = htons(pCreate->vnode); - - if (vid >= TSDB_MAX_VNODES || vid < 0) { - dError("vid:%d, vnode is out of range", vid); - code = TSDB_CODE_INVALID_VNODE_ID; - goto _over; - } - - pVnode = vnodeList + vid; - if (pVnode->cfg.maxSessions <= 0) { - dError("vid:%d, not activated", vid); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _over; - } - -// if (pVnode->syncStatus == TSDB_VN_SYNC_STATUS_SYNCING) { -// code = vnodeSaveCreateMsgIntoQueue(pVnode, pMsg, msgLen); -// dTrace("vid:%d, create msg is saved into sync queue", vid); -// } else { - code = vnodeProcessCreateMeterMsg(pMsg, msgLen); -// } - -_over: - taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP, code); - +int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, void *pConn) { + SDCreateTableMsg *table = (SDCreateTableMsg *) pCont; + int32_t code = dnodeCreateTable(table); + rpcSendSimpleRsp(pConn, code); return code; } -int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { - SAlterStreamMsg *pAlter; - int code = 0; - int vid, sid; - SVnodeObj * pVnode; - - pAlter = (SAlterStreamMsg *)pMsg; - vid = htons(pAlter->vnode); - sid = htonl(pAlter->sid); - - if (vid >= TSDB_MAX_VNODES || vid < 0) { - dError("vid:%d, vnode is out of range", vid); - code = TSDB_CODE_INVALID_VNODE_ID; - goto _over; - } - - pVnode = vnodeList + vid; - if (pVnode->cfg.maxSessions <= 0 || pVnode->pCachePool == NULL) { - dError("vid:%d is not activated yet", pAlter->vnode); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _over; - } - - if (pAlter->sid >= pVnode->cfg.maxSessions || pAlter->sid < 0) { - dError("vid:%d sid:%d uid:%" PRIu64 ", sid is out of range", pAlter->vnode, pAlter->sid, pAlter->uid); - code = TSDB_CODE_INVALID_TABLE_ID; - goto _over; - } - - SMeterObj *pMeterObj = vnodeList[vid].meterList[sid]; - if (pMeterObj == NULL || sid != pMeterObj->sid || vid != pMeterObj->vnode) { - dError("vid:%d sid:%d, not active table", vid, sid); - code = TSDB_CODE_NOT_ACTIVE_TABLE; - goto _over; - } - - pMeterObj->status = pAlter->status; - if (pMeterObj->status == 1) { - if (pAlter->stime > pMeterObj->lastKey) // starting time can be specified - pMeterObj->lastKey = pAlter->stime; - vnodeCreateStream(pMeterObj); - } else { - vnodeRemoveStream(pMeterObj); - } - - vnodeSaveMeterObjToFile(pMeterObj); - -_over: - taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_ALTER_STREAM_RSP, code); - +int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, void *pConn) { + SDCreateTableMsg *table = (SDCreateTableMsg *) pCont; + int32_t code = dnodeCreateTable(table); + rpcSendSimpleRsp(pConn, code); return code; } -int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) { - int code; - SMeterObj * pObj = NULL; - SConnSec connSec; - SCreateMsg *pCreate = (SCreateMsg *)pMsg; - - pCreate->vnode = htons(pCreate->vnode); - pCreate->sid = htonl(pCreate->sid); - pCreate->lastCreate = htobe64(pCreate->lastCreate); - pCreate->timeStamp = htobe64(pCreate->timeStamp); - - if (pCreate->vnode >= TSDB_MAX_VNODES || pCreate->vnode < 0) { - dError("vid:%d is out of range", pCreate->vnode); - code = TSDB_CODE_INVALID_VNODE_ID; - goto _create_over; - } - - SVnodeObj *pVnode = vnodeList + pCreate->vnode; - if (pVnode->pCachePool == NULL) { - dError("vid:%d is not activated yet", pCreate->vnode); - vnodeSendVpeerCfgMsg(pCreate->vnode); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _create_over; - } - - if (pCreate->sid >= pVnode->cfg.maxSessions || pCreate->sid < 0) { - dError("vid:%d sid:%d id:%s, sid is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId); - code = TSDB_CODE_INVALID_TABLE_ID; - goto _create_over; - } - - pCreate->numOfColumns = htons(pCreate->numOfColumns); - if (pCreate->numOfColumns <= 0) { - dTrace("vid:%d sid:%d id:%s, numOfColumns is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId); - code = TSDB_CODE_OTHERS; - goto _create_over; - } - - pCreate->sqlLen = htons(pCreate->sqlLen); - pObj = (SMeterObj *)calloc(1, sizeof(SMeterObj) + pCreate->sqlLen + 1); - if (pObj == NULL) { - dError("vid:%d sid:%d id:%s, no memory to allocate meterObj", pCreate->vnode, pCreate->sid, pCreate->meterId); - code = TSDB_CODE_NO_RESOURCE; - goto _create_over; - } - - /* - * memory alignment may cause holes in SColumn struct which are not assigned any value - * therefore, we could not use memcmp to compare whether two SColumns are equal or not. - * So, we need to set the memory to 0 when allocating memory. - */ - pObj->schema = (SColumn *)calloc(1, pCreate->numOfColumns * sizeof(SColumn)); - - pObj->vnode = pCreate->vnode; - pObj->sid = pCreate->sid; - pObj->uid = pCreate->uid; - memcpy(pObj->meterId, pCreate->meterId, TSDB_TABLE_ID_LEN); - pObj->numOfColumns = pCreate->numOfColumns; - pObj->timeStamp = pCreate->timeStamp; - pObj->sversion = htonl(pCreate->sversion); - pObj->maxBytes = 0; - - for (int i = 0; i < pObj->numOfColumns; ++i) { - pObj->schema[i].type = pCreate->schema[i].type; - pObj->schema[i].bytes = htons(pCreate->schema[i].bytes); - pObj->schema[i].colId = htons(pCreate->schema[i].colId); - pObj->bytesPerPoint += pObj->schema[i].bytes; - if (pObj->maxBytes < pObj->schema[i].bytes) pObj->maxBytes = pObj->schema[i].bytes; - } - - if (pCreate->sqlLen > 0) { - pObj->sqlLen = pCreate->sqlLen; - pObj->pSql = ((char *)pObj) + sizeof(SMeterObj); - memcpy(pObj->pSql, (char *)pCreate->schema + pCreate->numOfColumns * sizeof(SMColumn), pCreate->sqlLen); - pObj->pSql[pCreate->sqlLen] = 0; - } - - pObj->pointsPerFileBlock = pVnode->cfg.rowsInFileBlock; - - if (sizeof(TSKEY) != pObj->schema[0].bytes) { - dError("key length is not matched, required key length:%d", sizeof(TSKEY)); - code = TSDB_CODE_OTHERS; - goto _create_over; - } - - // security info shall be saved here - connSec.spi = pCreate->spi; - connSec.encrypt = pCreate->encrypt; - memcpy(connSec.secret, pCreate->secret, TSDB_KEY_LEN); - memcpy(connSec.cipheringKey, pCreate->cipheringKey, TSDB_KEY_LEN); - - code = vnodeCreateMeterObj(pObj, &connSec); - -_create_over: - if (code != TSDB_CODE_SUCCESS) { - dTrace("vid:%d sid:%d id:%s, failed to create meterObj", pCreate->vnode, pCreate->sid, pCreate->meterId); - tfree(pObj); - } +int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, void *pConn) { + SDRemoveTableMsg *table = (SDRemoveTableMsg *) pCont; + int32_t vnode = htonl(table->vnode); + int32_t sid = htonl(table->sid); + uint64_t uid = htobe64(table->uid); + dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); + int32_t code = dnodeDropTable(vnode, sid, uid); + rpcSendSimpleRsp(pConn, code); return code; } -int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { - SMeterObj * pObj; - SRemoveMeterMsg *pRemove; - int code = 0; - - pRemove = (SRemoveMeterMsg *)pMsg; - pRemove->vnode = htons(pRemove->vnode); - pRemove->sid = htonl(pRemove->sid); - - if (pRemove->vnode < 0 || pRemove->vnode >= TSDB_MAX_VNODES) { - dWarn("vid:%d sid:%d, already removed", pRemove->vnode, pRemove->sid); - goto _remove_over; - } - - if (vnodeList[pRemove->vnode].meterList == NULL) goto _remove_over; - - pObj = vnodeList[pRemove->vnode].meterList[pRemove->sid]; - if (pObj == NULL) goto _remove_over; - - if (memcmp(pObj->meterId, pRemove->meterId, TSDB_TABLE_ID_LEN) != 0) { - dWarn("vid:%d sid:%d id:%s, remove ID:%s, meter ID not matched", pObj->vnode, pObj->sid, pObj->meterId, - pRemove->meterId); - goto _remove_over; - } - - if (vnodeRemoveMeterObj(pRemove->vnode, pRemove->sid) == TSDB_CODE_ACTION_IN_PROGRESS) { - code = TSDB_CODE_ACTION_IN_PROGRESS; - goto _remove_over; - } - - dTrace("vid:%d sid:%d id:%s, meterObj is removed", pRemove->vnode, pRemove->sid, pRemove->meterId); - -_remove_over: - taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_REMOVE_RSP, code); - return 0; -} - -int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { - SVPeersMsg *pMsg = (SVPeersMsg *)msg; - int i, vnode; - - vnode = htonl(pMsg->vnode); - if (vnode >= TSDB_MAX_VNODES) { - dError("vid:%d, vnode is out of range", vnode); - return -1; - } - - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_CREATING) { - dTrace("vid:%d, vnode is still under creating", vnode); - return 0; - } - - SVnodeCfg *pCfg = &pMsg->cfg; - pCfg->vgId = htonl(pCfg->vgId); - pCfg->maxSessions = htonl(pCfg->maxSessions); - pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize); - pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks); - pCfg->daysPerFile = htonl(pCfg->daysPerFile); - pCfg->daysToKeep1 = htonl(pCfg->daysToKeep1); - pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); - pCfg->daysToKeep = htonl(pCfg->daysToKeep); - pCfg->commitTime = htonl(pCfg->commitTime); - pCfg->blocksPerMeter = htons(pCfg->blocksPerMeter); - pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - - if (pCfg->replications > 0) { - dPrint("vid:%d, vpeer cfg received, replica:%d session:%d, vnodeList replica:%d session:%d, acct:%s db:%s", - vnode, pCfg->replications, pCfg->maxSessions, vnodeList[vnode].cfg.replications, vnodeList[vnode].cfg.maxSessions, - pCfg->acct, pCfg->db); - for (i = 0; i < pCfg->replications; ++i) { - pMsg->vpeerDesc[i].vnode = htonl(pMsg->vpeerDesc[i].vnode); - pMsg->vpeerDesc[i].ip = htonl(pMsg->vpeerDesc[i].ip); - dPrint("vid:%d, vpeer:%d ip:0x%x vid:%d ", vnode, i, pMsg->vpeerDesc[i].ip, pMsg->vpeerDesc[i].vnode); - } - } - - if (vnodeList[vnode].cfg.maxSessions == 0) { - dPrint("vid:%d, vnode is empty", vnode); - if (pCfg->maxSessions > 0) { - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_OFFLINE) { - dPrint("vid:%d, status:%s, start to create vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc); - } else { - dPrint("vid:%d, status:%s, cannot preform create vnode operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - return TSDB_CODE_INVALID_VNODE_STATUS; - } - } - } else { - dPrint("vid:%d, vnode is not empty", vnode); - if (pCfg->maxSessions > 0) { - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_DELETING) { - dPrint("vid:%d, status:%s, wait vnode delete finished", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - } else { - dPrint("vid:%d, status:%s, start to update vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - - if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) { - vnodeCleanUpOneVnode(vnode); - } - - vnodeConfigVPeers(vnode, pCfg->replications, pMsg->vpeerDesc); - vnodeSaveVnodeCfg(vnode, pCfg, pMsg->vpeerDesc); - - /* - if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) { - vnodeUpdateHeadFile(vnode, vnodeList[vnode].cfg.maxSessions, pCfg->maxSessions); - vnodeList[vnode].cfg.maxSessions = pCfg->maxSessions; - vnodeOpenVnode(vnode); - } - */ - } - return 0; - } else { - dPrint("vid:%d, status:%s, start to delete vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - vnodeRemoveVnode(vnode); - } - } - - return 0; -} - -int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) { - STaosRsp *pRsp; - - pRsp = (STaosRsp *)msg; +int32_t dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, void *pConn) { + int32_t code = htonl(*((int32_t *) pCont)); - if (pRsp->code == 0) { - vnodeProcessVPeerCfg(pRsp->more, msgLen - sizeof(STaosRsp), pMgmtObj); + if (code == TSDB_CODE_SUCCESS) { + SVPeersMsg *vpeer = (SVPeersMsg *) (pCont + sizeof(int32_t)); + int32_t vnode = htonl(vpeer->vnode); + return dnodeCreateVnode(vnode, vpeer); + } else if (code == TSDB_CODE_INVALID_VNODE_ID) { + SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t)); + int32_t vnode = htonl(vpeer->vnode); + dError("vnode:%d, not exist, remove it", vnode); + return dnodeDropVnode(vnode); } else { - int32_t *pint = (int32_t *)pRsp->more; - int vnode = htonl(*pint); - if (vnode < TSDB_MAX_VNODES && vnodeList[vnode].lastKey != 0) { - dError("vnode:%d not configured, it shall be empty, code:%d", vnode, pRsp->code); - vnodeRemoveVnode(vnode); - } else { - dError("vnode:%d is invalid, code:%d", vnode, pRsp->code); - } + dError("code:%d invalid message", code); + return TSDB_CODE_INVALID_MSG; } - - return 0; } -int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { - int code = 0; - - code = vnodeProcessVPeerCfg(msg, msgLen, pMgmtObj); - - char * pStart; - STaosRsp * pRsp; - SVPeersMsg *pVPeersMsg = (SVPeersMsg *)msg; - - pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP); - if (pStart == NULL) return -1; - - pRsp = (STaosRsp *)pStart; - pRsp->code = code; - memcpy(pRsp->more, pVPeersMsg->cfg.db, TSDB_DB_NAME_LEN); +int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, void *pConn) { + SVPeersMsg *vpeer = (SVPeersMsg *) pCont; + int32_t vnode = htonl(vpeer->vnode); - msgLen = sizeof(STaosRsp) + TSDB_DB_NAME_LEN; - taosSendMsgToMnode(pMgmtObj, pStart, msgLen); + dPrint("vnode:%d, start to config", vnode); + int32_t code = dnodeCreateVnode(vnode, vpeer); + rpcSendSimpleRsp(pConn, code); return code; } -int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { - SFreeVnodeMsg *pFree; +int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, void *pConn) { + SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont; + int32_t vnode = htonl(vpeer->vnode); - pFree = (SFreeVnodeMsg *)pMsg; - pFree->vnode = htons(pFree->vnode); + dPrint("vnode:%d, remove it", vnode); - if (pFree->vnode < 0 || pFree->vnode >= TSDB_MAX_VNODES) { - dWarn("vid:%d, out of range", pFree->vnode); - return -1; - } - - dTrace("vid:%d, receive free vnode message", pFree->vnode); - int32_t code = vnodeRemoveVnode(pFree->vnode); - assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS); + int32_t code = dnodeDropVnode(vnode); + rpcSendSimpleRsp(pConn, code); - taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP, code); - return 0; + return code; } -int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) { - SCfgMsg *pCfg = (SCfgMsg *)cont; - - int code = tsCfgDynamicOptions(pCfg->config); - - taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_CFG_RSP, code); - - return 0; +int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, void *pConn) { + SCfgMsg *pCfg = (SCfgMsg *)pCont; + int32_t code = tsCfgDynamicOptions(pCfg->config); + rpcSendSimpleRsp(pConn, code); + return code; } void dnodeSendVpeerCfgMsg(int32_t vnode) { - char * pMsg, *pStart; - int msgLen; - SVpeerCfgMsg *pCfg; - SMgmtObj * pObj = &mgmtObj; - - pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VNODE_CFG); - if (pStart == NULL) return; - pMsg = pStart; - - pCfg = (SVpeerCfgMsg *)pMsg; - pCfg->vnode = htonl(vnode); - pMsg += sizeof(SVpeerCfgMsg); + SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg)); + if (cfg == NULL) { + return; + } - msgLen = pMsg - pStart; - taosSendMsgToMnode(pObj, pStart, msgLen); + cfg->vnode = htonl(vnode); + taosSendMsgToMnode(cfg, sizeof(SMeterCfgMsg)); } void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { - char * pMsg, *pStart; - int msgLen; - SMeterCfgMsg *pCfg; - SMgmtObj * pObj = &mgmtObj; - - pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_TABLE_CFG); - if (pStart == NULL) return -1; - pMsg = pStart; - - pCfg = (SMeterCfgMsg *)pMsg; - pCfg->vnode = htonl(vnode); - pCfg->sid = htonl(sid); - pMsg += sizeof(SMeterCfgMsg); - - msgLen = pMsg - pStart; - return taosSendMsgToMnode(pObj, pStart, msgLen); -} + SMeterCfgMsg *cfg = (SMeterCfgMsg *) rpcMallocCont(sizeof(SMeterCfgMsg)); + if (cfg == NULL) { + return; + } + cfg->vnode = htonl(vnode); + taosSendMsgToMnode(cfg, sizeof(SMeterCfgMsg)); +} void dnodeInitProcessShellMsg() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE] = dnodeProcessCreateTableRequest; diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index c873f11eaa..d801e869d3 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -29,7 +29,7 @@ #include "dnodeRead.h" #include "dnodeSystem.h" #include "dnodeShell.h" -#include "dnodeUtil.h" +#include "dnodeVnodeMgmt.h" #include "dnodeWrite.h" static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn); diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 3d77cf2e83..e9081866b3 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -53,7 +53,7 @@ static int32_t dnodeInitTmrCtl(); void *tsStatusTimer = NULL; void *vnodeTmrCtrl; void **tsRpcQhandle; -void *dmQhandle; +void *tsDnodeMgmtQhandle; void *tsQueryQhandle; int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1; int32_t tsMaxQueues; @@ -298,7 +298,7 @@ static int32_t dnodeInitRpcQHandle() { tsRpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode"); } - dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); + tsDnodeMgmtQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); return 0; } diff --git a/src/dnode/src/dnodeUtil.c b/src/dnode/src/dnodeUtil.c deleted file mode 100644 index b1d2fc0cb6..0000000000 --- a/src/dnode/src/dnodeUtil.c +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dnodeUtil.h" - -EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { - return TSDB_VN_STATUS_MASTER; -} - -bool dnodeCheckVnodeExist(int32_t vnode) { - return true; -} diff --git a/src/dnode/src/dnodeVnodeMgmt.c b/src/dnode/src/dnodeVnodeMgmt.c index 4a2012aa35..ee92991203 100644 --- a/src/dnode/src/dnodeVnodeMgmt.c +++ b/src/dnode/src/dnodeVnodeMgmt.c @@ -14,4 +14,20 @@ */ #define _DEFAULT_SOURCE +#include "os.h" +#include "tlog.h" +#include "taoserror.h" +#include "dnodeVnodeMgmt.h" + +EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { + return TSDB_VN_STATUS_MASTER; +} + +bool dnodeCheckVnodeExist(int32_t vnode) { + return true; +} + +bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { + return true; +} diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 588ba49483..d70e2b80b5 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -18,6 +18,7 @@ #include "taoserror.h" #include "tlog.h" #include "dnodeWrite.h" +#include "dnodeVnodeMgmt.h" void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) { SShellSubmitRspMsg result = {0}; @@ -32,35 +33,38 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe //TODO: submit implementation } -int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table) { - return 0; +int32_t dnodeCreateTable(SDCreateTableMsg *table) { + return TSDB_CODE_SUCCESS; } -int32_t dnodeCreateStreamTable(SCreateStreamTableMsg *table) { - return 0; -} - -int32_t dnodeCreateChildTable(SCreateChildTableMsg *table) { - return 0; -} -int32_t dnodeAlterNormalTable(SCreateNormalTableMsg *table) { - return 0; +/* + * Remove table from local repository + */ +int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid) { + return TSDB_CODE_SUCCESS; } -int32_t dnodeAlterStreamTable(SCreateStreamTableMsg *table) { - return 0; -} +/* + * Create stream + * if stream already exist, update it + */ +int32_t dnodeCreateStream(SAlterStreamMsg *stream) { + int32_t vnode = htonl(stream->vnode); + int32_t sid = htonl(stream->sid); + uint64_t uid = htobe64(stream->uid); -int32_t dnodeAlterChildTable(SCreateChildTableMsg *table) { - return 0; -} + if (!dnodeCheckTableExist(vnode, sid, uid)) { + return TSDB_CODE_INVALID_TABLE; + } -int32_t dnodeDropSuperTable(int vid, int sid, int64_t uid) { - return 0; + //TODO create or remove stream } -int32_t dnodeDropTable(int vid, int sid, int64_t uid) { - return 0; +/* + * Remove all child tables of supertable from local repository + */ +int32_t dnodeDropSuperTable(uint64_t stableUid) { + return TSDB_CODE_SUCCESS; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 27594686d4..0970328806 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -49,7 +49,6 @@ extern char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int si extern char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size); extern char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type); extern char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type); -extern int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen); extern int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code); extern void (*dnodeInitMgmtIp)(); extern int (*dnodeInitMgmt)(); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 101df94805..37d4bca28a 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -335,6 +335,7 @@ typedef struct { char payload[]; /* payload for wildcard match in show tables */ } SShowObj; +void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); #ifdef __cplusplus } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index dcc224ef05..debb1a9a6f 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -161,6 +161,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode s TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources") TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message") #ifdef TAOS_ERROR_C }; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 0b41a1b949..8d0a6bb4aa 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -313,72 +313,28 @@ typedef struct SSchema { short bytes; } SSchema; -typedef struct SMColumn { +typedef struct { int8_t type; int16_t colId; int16_t bytes; -} SMColumn; - -typedef struct { - int32_t size; - int8_t* data; -} SVariableMsg; - -typedef struct { - short vnode; - int32_t sid; - uint64_t uid; - char spi; - char encrypt; - char meterId[TSDB_TABLE_ID_LEN]; - char secret[TSDB_KEY_LEN]; - char cipheringKey[TSDB_KEY_LEN]; - uint64_t timeStamp; - uint64_t lastCreate; - short numOfColumns; - short sqlLen; // SQL string is after schema - char reserved[16]; - int32_t sversion; - SMColumn schema[]; -} SCreateMsg; +} SDTableColumn; typedef struct { int32_t vnode; int32_t sid; uint64_t uid; - char tableId[TSDB_TABLE_ID_LEN + 1]; - char superTableId[TSDB_TABLE_ID_LEN + 1]; - uint64_t createdTime; + uint64_t superTableUid; + int32_t tableType; int32_t sversion; int16_t numOfColumns; int16_t numOfTags; int32_t tagDataLen; - int8_t data[]; -} SCreateChildTableMsg; - -typedef struct { - int32_t vnode; - int32_t sid; - uint64_t uid; - char tableId[TSDB_TABLE_ID_LEN + 1]; + int32_t sqlDataLen; uint64_t createdTime; - int32_t sversion; - int16_t numOfColumns; - int8_t data[]; -} SCreateNormalTableMsg; - -typedef struct { - int32_t vnode; - int32_t sid; - uint64_t uid; char tableId[TSDB_TABLE_ID_LEN + 1]; - uint64_t createdTime; - int32_t sversion; - int16_t numOfColumns; - int32_t sqlLen; + char superTableId[TSDB_TABLE_ID_LEN + 1]; int8_t data[]; -} SCreateStreamTableMsg; - +} SDCreateTableMsg; typedef struct { char db[TSDB_TABLE_ID_LEN]; @@ -468,10 +424,10 @@ typedef struct { int32_t sid; uint64_t uid; char meterId[TSDB_TABLE_ID_LEN]; -} SRemoveMeterMsg; +} SDRemoveTableMsg; typedef struct { - short vnode; + int32_t vnode; } SFreeVnodeMsg; typedef struct SColIndexEx { @@ -923,11 +879,11 @@ typedef struct { } SKillQuery, SKillStream, SKillConnection; typedef struct { - short vnode; + int32_t vnode; int32_t sid; uint64_t uid; uint64_t stime; // stream starting time - char status; + int32_t status; } SAlterStreamMsg; #pragma pack(pop) diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 84090da5cf..a2657dfc40 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -78,7 +78,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { pMsg = mgmtBuildCreateMeterIe(pTable, pMsg, vnode); } else { mTrace("dnode:%s, vnode:%d sid:%d, meter not there", taosIpStr(pObj->privateIp), vnode, sid); - *pMsg = TSDB_CODE_INVALID_METER_ID; + *pMsg = TSDB_CODE_INVALID_TABLE_ID; pMsg++; *(int32_t *)pMsg = htonl(vnode); @@ -307,7 +307,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { } int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { - SRemoveMeterMsg *pRemove; + SDRemoveTableMsg *pRemove; char * pMsg, *pStart; int i, msgLen = 0; SDnodeObj * pObj; @@ -326,12 +326,12 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { if (pStart == NULL) continue; pMsg = pStart; - pRemove = (SRemoveMeterMsg *)pMsg; + pRemove = (SDRemoveTableMsg *)pMsg; pRemove->vnode = htons(pVgroup->vnodeGid[i].vnode); pRemove->sid = htonl(pTable->gid.sid); memcpy(pRemove->meterId, pTable->meterId, TSDB_TABLE_ID_LEN); - pMsg += sizeof(SRemoveMeterMsg); + pMsg += sizeof(SDRemoveTableMsg); msgLen = pMsg - pStart; taosSendMsgToDnode(pObj, pStart, msgLen); @@ -559,7 +559,7 @@ int mgmtSendCfgDnodeMsg(char *cont) { * functions for communicate between dnode and mnode */ -extern void *dmQhandle; +extern void *tsDnodeMgmtQhandle; void * mgmtStatusTimer = NULL; void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj); @@ -608,7 +608,7 @@ int32_t taosSendMsgToDnodeImp(SDnodeObj *pObj, char *msg, int32_t msgLen) { schedMsg.msg = msg - 1; schedMsg.ahandle = NULL; schedMsg.thandle = NULL; - taosScheduleTask(dmQhandle, &schedMsg); + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); return 0; } @@ -682,13 +682,3 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { */ } void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId) = mgmtProcessDnodeStatusImp; - -void mgmtProcessMsgFromDnodeSpecImp(SSchedMsg *sched) { - char msgType = *sched->msg; - char *content = sched->msg + 1; - mTrace("msg:%s is received from dnode", taosMsg[(uint8_t)msgType]); - - mgmtProcessMsgFromDnode(content, 0, msgType, mgmtGetDnode(0)); - free(sched->msg); -} -void (*mgmtProcessMsgFromDnodeSpec)(SSchedMsg *sched) = mgmtProcessMsgFromDnodeSpecImp; diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index ce3175abcc..ffd7ca4aa2 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -296,11 +296,11 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { if (pShow->payloadLen > 0 ) { pTable = mgmtGetTable(pShow->payload); if (NULL == pTable) { - return TSDB_CODE_INVALID_METER_ID; + return TSDB_CODE_INVALID_TABLE_ID; } pVgroup = mgmtGetVgroup(pTable->gid.vgId); - if (NULL == pVgroup) return TSDB_CODE_INVALID_METER_ID; + if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID; maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; } else { -- GitLab