diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 78af597132ba635af7f61898e9b8350b240107c8..ec0aea73ac043e468fa163c67bff5b7f276acd12 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -38,7 +38,7 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe * Create table with specified configuration and open it * if table already exist, update its schema and tag */ -int32_t dnodeCreateTable(SDCreateTableMsg *table); +int32_t dnodeCreateTable(SDCreateTableMsg *pTable); /* * Remove table from local repository diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index d26574f7cb3e7bca5205259127852e7f89d2f38f..a5e42845764eff7e7ea1469c64b4a3f8a54ed590 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -33,7 +33,7 @@ void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgTyp void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; -static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn); +static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) { @@ -41,17 +41,20 @@ static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) { int32_t code = *(int32_t *) (sched->msg - 8); int8_t msgType = *(int8_t *) (sched->msg - 9); void *handle = sched->ahandle; - int8_t *pCont = sched->msg; + int8_t *pCont = sched->msg; mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code); - rpcFreeCont(sched->msg); } void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) { - dTrace("msg:%s is sent to mnode", taosMsg[msgType]); + dTrace("msg:%d:%s is sent to mnode", msgType, taosMsg[msgType]); if (dnodeSendMsgToMnodeFp) { dnodeSendMsgToMnodeFp(msgType, pCont, contLen); } else { + if (pCont == NULL) { + pCont = rpcMallocCont(1); + contLen = 0; + } SSchedMsg schedMsg = {0}; schedMsg.fp = dnodeSendMsgToMnodeQueueFp; schedMsg.msg = pCont; @@ -63,13 +66,19 @@ void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) { } void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) { - dTrace("rsp:%s is sent to mnode", taosMsg[msgType]); + dTrace("rsp:%d:%s is sent to mnode, pConn:%p", msgType, taosMsg[msgType], pConn); if (dnodeSendRspToMnodeFp) { dnodeSendRspToMnodeFp(pConn, code, pCont, contLen); } else { + //hack way + if (pCont == NULL) { + pCont = rpcMallocCont(1); + contLen = 0; + } SSchedMsg schedMsg = {0}; - schedMsg.fp = dnodeSendMsgToMnodeQueueFp; - schedMsg.msg = pCont; + schedMsg.fp = dnodeSendMsgToMnodeQueueFp; + schedMsg.msg = pCont; + schedMsg.ahandle = pConn; *(int32_t *) (pCont - 4) = contLen; *(int32_t *) (pCont - 8) = code; *(int8_t *) (pCont - 9) = msgType; @@ -95,65 +104,79 @@ void dnodeInitMgmtIp() { void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); + return; + } + + dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[msgType], pConn); + + if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) { + dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn); + } + if (dnodeProcessMgmtMsgFp[msgType]) { + (*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn); } else { - if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) { - dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn); - } - if (dnodeProcessMgmtMsgFp[msgType]) { - (*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn); - } else { - dError("%s is not processed", taosMsg[msgType]); - } + dError("%s is not processed", taosMsg[msgType]); } + + //rpcFreeCont(pCont); } -int32_t dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { int32_t code = htonl(*((int32_t *) pCont)); if (code == TSDB_CODE_SUCCESS) { SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t)); - return dnodeCreateTable(table); + dnodeCreateTable(table); } else if (code == TSDB_CODE_INVALID_TABLE_ID) { SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pCont + sizeof(int32_t)); int32_t vnode = htonl(table->vnode); int32_t sid = htonl(table->sid); uint64_t uid = htobe64(table->uid); dError("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); - return dnodeDropTable(vnode, sid, uid); + dnodeDropTable(vnode, sid, uid); } else { dError("code:%d invalid message", code); - return TSDB_CODE_INVALID_MSG; } } -int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { SDCreateTableMsg *pTable = (SDCreateTableMsg *) pCont; - pTable->vnode = htonl(pTable->vnode); - pTable->sid = htonl(pTable->sid); - pTable->uid = htobe64(pTable->uid); - pTable->superTableUid = htobe64(pTable->superTableUid); - pTable->tableType = htonl(pTable->tableType); - pTable->sversion = htonl(pTable->sversion); pTable->numOfColumns = htons(pTable->numOfColumns); pTable->numOfTags = htons(pTable->numOfTags); + pTable->sid = htonl(pTable->sid); + pTable->sversion = htonl(pTable->sversion); pTable->tagDataLen = htonl(pTable->tagDataLen); pTable->sqlDataLen = htonl(pTable->sqlDataLen); + pTable->contLen = htonl(pTable->contLen); + pTable->numOfVPeers = htonl(pTable->numOfVPeers); + pTable->uid = htobe64(pTable->uid); + pTable->superTableUid = htobe64(pTable->superTableUid); pTable->createdTime = htobe64(pTable->createdTime); + for (int i = 0; i < pTable->numOfVPeers; ++i) { + pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip); + pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode); + } + + int32_t totalCols = pTable->numOfColumns + pTable->numOfTags; + SSchema *pSchema = (SSchema *) pTable->data; + for (int32_t col = 0; col < totalCols; ++col) { + pSchema->bytes = htons(pSchema->bytes); + pSchema->colId = htons(pSchema->colId); + pSchema++; + } + int32_t code = dnodeCreateTable(pTable); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); - - return code; } -int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont; int32_t code = dnodeCreateStream(stream); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); - return code; } -int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { SDRemoveTableMsg *table = (SDRemoveTableMsg *) pCont; int32_t vnode = htonl(table->vnode); int32_t sid = htonl(table->sid); @@ -162,28 +185,26 @@ int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t ms dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); int32_t code = dnodeDropTable(vnode, sid, uid); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); - return code; } -int32_t dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { int32_t code = htonl(*((int32_t *) pCont)); if (code == TSDB_CODE_SUCCESS) { SVPeersMsg *vpeer = (SVPeersMsg *) (pCont + sizeof(int32_t)); int32_t vnode = htonl(vpeer->vnode); - return dnodeCreateVnode(vnode, vpeer); + dnodeCreateVnode(vnode, vpeer); } else if (code == TSDB_CODE_INVALID_VNODE_ID) { SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t)); int32_t vnode = htonl(vpeer->vnode); dError("vnode:%d, not exist, remove it", vnode); - return dnodeDropVnode(vnode); + dnodeDropVnode(vnode); } else { dError("code:%d invalid message", code); - return TSDB_CODE_INVALID_MSG; } } -int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { SVPeersMsg *vpeer = (SVPeersMsg *) pCont; int32_t vnode = htonl(vpeer->vnode); @@ -191,10 +212,9 @@ int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, vo int32_t code = dnodeCreateVnode(vnode, vpeer); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); - return code; } -int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont; int32_t vnode = htonl(vpeer->vnode); @@ -202,15 +222,12 @@ int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgT int32_t code = dnodeDropVnode(vnode); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); - - return code; } -int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; int32_t code = tsCfgDynamicOptions(pCfg->config); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); - return code; } void dnodeSendVpeerCfgMsg(int32_t vnode) { diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 59ede2711b15ff1c1384d05f408e4837d95d80e8..f466b4f3f0e59ff911d6ce82a2f542698c930ff3 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -17,6 +17,7 @@ #include "os.h" #include "taoserror.h" #include "tlog.h" +#include "tutil.h" #include "dnodeWrite.h" #include "dnodeVnodeMgmt.h" @@ -33,7 +34,34 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe //TODO: submit implementation } -int32_t dnodeCreateTable(SDCreateTableMsg *table) { +int32_t dnodeCreateTable(SDCreateTableMsg *pTable) { + if (pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) { + dTrace("table:%s, start to create child table, stable:%s", pTable->tableId, pTable->superTableId); + } else if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE){ + dTrace("table:%s, start to create normal table", pTable->tableId); + } else if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE){ + dTrace("table:%s, start to create stream table", pTable->tableId); + } else { + dError("table:%s, invalid table type:%d", pTable->tableType); + } + + for (int i = 0; i < pTable->numOfVPeers; ++i) { + dTrace("table:%s ip:%s vnode:%d sid:%d", pTable->tableId, taosIpStr(pTable->vpeerDesc[i].ip), + pTable->vpeerDesc[i].vnode, pTable->sid); + } + + SSchema *pSchema = (SSchema *) pTable->data; + for (int32_t col = 0; col < pTable->numOfColumns; ++col) { + dTrace("table:%s col index:%d colId:%d bytes:%d type:%d name:%s", + pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name); + pSchema++; + } + for (int32_t col = 0; col < pTable->numOfTags; ++col) { + dTrace("table:%s tag index:%d colId:%d bytes:%d type:%d name:%s", + pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name); + pSchema++; + } + return TSDB_CODE_SUCCESS; } diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 22d3b340d7efcefab6ab96bb9d539e35eb914b50..ca6294f623f6a6bdc5e6981068cd0690d0aef153 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -100,6 +100,7 @@ typedef struct { typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; + int8_t dirty; uint64_t uid; int32_t sid; int32_t vgId; @@ -111,6 +112,7 @@ struct _vg_obj; typedef struct SSuperTableObj { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; + int8_t dirty; uint64_t uid; int32_t sid; int32_t vgId; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index e6202673bc0019f0dc9677ec76b180d22d3eb110..2661712adb81a1b3141fe8b3069195d0e0438173 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -230,21 +230,27 @@ typedef struct SSchema { } SSchema; typedef struct { - int32_t vnode; - int32_t sid; - uint64_t uid; - uint64_t superTableUid; - int32_t tableType; - int32_t sversion; - int16_t numOfColumns; - int16_t numOfTags; - int32_t tagDataLen; - int32_t sqlDataLen; - int32_t contLen; - uint64_t createdTime; - char tableId[TSDB_TABLE_ID_LEN + 1]; - char superTableId[TSDB_TABLE_ID_LEN + 1]; - char data[]; + int32_t vnode; //the index of vnode + uint32_t ip; +} SVPeerDesc; + +typedef struct { + int8_t tableType; + int16_t numOfColumns; + int16_t numOfTags; + int32_t sid; + int32_t sversion; + int32_t tagDataLen; + int32_t sqlDataLen; + int32_t contLen; + int32_t numOfVPeers; + uint64_t uid; + uint64_t superTableUid; + uint64_t createdTime; + SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; + char tableId[TSDB_TABLE_ID_LEN + 1]; + char superTableId[TSDB_TABLE_ID_LEN + 1]; + char data[]; } SDCreateTableMsg; typedef struct { @@ -607,12 +613,6 @@ typedef struct { char cipheringKey[TSDB_KEY_LEN]; } SSecIe; -typedef struct { - uint32_t dnode; //the ip of dnode - int32_t vnode; //the index of vnode - uint32_t ip; -} SVPeerDesc; - typedef struct { int32_t numOfVPeers; SVPeerDesc vpeerDesc[]; diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 79672d96d07f8355408e96a4810461e58927a377..d589e9bc97bacba3639a6cb856e790ddc2e7c17a 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -31,11 +31,11 @@ void mgmtCleanUpChildTables(); void * mgmtGetChildTable(char *tableId); -int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, + SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); -SCreateTableMsg *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable); int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 2e0ff6ab4c21a9bd3dc60700c568e33f4d5f4195..fd1ba85b172c3b87c53949514e91508721bc73df 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -26,11 +26,11 @@ extern "C" { extern void *mgmtStatusTimer; -void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle); void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *table_info); void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index 66de6e4648805b45afc79198816c3945f768457a..270e4998ae792e2422516d6861496837dd34183c 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -29,7 +29,7 @@ void mgmtCleanUpNormalTables(); void * mgmtGetNormalTable(char *tableId); -int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index 0a4952675939aca7f357b158c7df959df2372af8..bda949cb42baa2c801efabbc2b6c70892bc7f63f 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -28,13 +28,13 @@ int32_t mgmtInitShell(); void mgmtCleanUpShell(); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn); -extern int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle); -extern int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern int32_t (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern int32_t (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern int32_t (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle); -extern int32_t (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle); +extern void (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle); +extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); +extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); +extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); +extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); +extern void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle); +extern void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index c3c003fe0fa18891caa6a60b6a26527812760096..d5f91a4959dfa5046be26bec864dfc444e119cd0 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle); +void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); @@ -40,8 +40,8 @@ void mgmtCleanUpMeters(); void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); +void mgmtSetTableDirty(STableInfo *pTable, bool isDirty); -SDCreateTableMsg *mgmtBuildCreateTableMsg(STableInfo *pTable); SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 249d7d133cf8b7077ff6cf523d666351c9507b5c..7c76a1b189c8d29a2e7121b995ed89a2b9f3b86b 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -37,7 +37,7 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); void mgmtSetVgroupIdPool(); -SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid); +SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable); void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 9f2acd779228bc8edeeec2fa687469eb4ef4d383..8cd88a73199fe3e9abce15344b568554541c77b9 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -274,38 +274,50 @@ void mgmtCleanUpChildTables() { sdbCloseTable(tsChildTableSdb); } -SCreateTableMsg *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable) { -// SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg; -// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); -// memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN); -// pCreateTable->vnode = htonl(vnode); -// pCreateTable->sid = htonl(pTable->sid); -// pCreateTable->uid = pTable->uid; -// pCreateTable->createdTime = htobe64(pTable->createdTime); -// pCreateTable->sversion = htonl(pTable->superTable->sversion); -// pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns); -// pCreateTable->numOfTags = htons(pTable->superTable->numOfTags); -// -// SSchema *pSchema = pTable->superTable->schema; -// int32_t totalCols = pCreateTable->numOfColumns + pCreateTable->numOfTags; -// -// for (int32_t col = 0; col < totalCols; ++col) { -// SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; -// colData->type = pSchema[col].type; -// colData->bytes = htons(pSchema[col].bytes); -// colData->colId = htons(pSchema[col].colId); -// } -// -// int32_t totalColsSize = sizeof(SMColumn *) * totalCols; -// pMsg = pCreateTable->data + totalColsSize + tagDataLen; -// -// memcpy(pCreateTable->data + totalColsSize, pTagData, tagDataLen); -// pCreateTable->tagDataLen = htonl(tagDataLen); +static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) { + int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; + int32_t contLen = sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; - return NULL; + SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen); + if (pCreateTable == NULL) { + return NULL; + } + + memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); + memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN); + pCreateTable->tableType = pTable->type; + pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns); + pCreateTable->numOfTags = htons(pTable->superTable->numOfTags); + pCreateTable->sid = htonl(pTable->sid); + pCreateTable->sversion = htonl(pTable->superTable->sversion); + pCreateTable->tagDataLen = htonl(tagDataLen); + pCreateTable->sqlDataLen = 0; + pCreateTable->contLen = htonl(contLen); + pCreateTable->numOfVPeers = htonl(pVgroup->numOfVnodes); + pCreateTable->uid = htobe64(pTable->uid); + pCreateTable->superTableUid = htobe64(pTable->superTable->uid); + pCreateTable->createdTime = htobe64(pTable->createdTime); + + for (int i = 0; i < pVgroup->numOfVnodes; ++i) { + pCreateTable->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); + pCreateTable->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } + + SSchema *pSchema = (SSchema *) pCreateTable->data; + memcpy(pSchema, pTable->superTable->schema, totalCols * sizeof(SSchema)); + for (int32_t col = 0; col < totalCols; ++col) { + pSchema->bytes = htons(pSchema->bytes); + pSchema->colId = htons(pSchema->colId); + pSchema++; + } + + memcpy(pCreateTable->data + totalCols * sizeof(SSchema), pTagData, tagDataLen); + + return pCreateTable; } -int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { +int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, + SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); if (numOfTables >= tsMaxTables) { mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables); @@ -321,6 +333,7 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); if (pTable == NULL) { + mError("table:%s, failed to alloc memory", pCreate->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } strcpy(pTable->tableId, pCreate->tableId); @@ -333,21 +346,23 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); - int32_t size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN; - SSchema * schema = (SSchema *) calloc(1, size); - if (schema == NULL) { - free(pTable); - mError("table:%s, corresponding super table schema is null", pCreate->tableId); - return TSDB_CODE_INVALID_TABLE; - } - if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { mError("table:%s, update sdb error", pCreate->tableId); return TSDB_CODE_SDB_ERROR; } - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", - pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + pTagData += (TSDB_TABLE_ID_LEN + 1); + int32_t tagDataLen = contLen - sizeof(SCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; + *pDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen); + if (*pDCreateOut == NULL) { + mError("table:%s, failed to build create table message", pCreate->tableId); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + *pTableOut = pTable; + + mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , + pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index c051af171e16cf0169df437f58906bfb7711e53c..bed5ea6afc8f9b9e87f8149448657b96187777c2 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -42,14 +42,17 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { int8_t *pCont = sched->msg; dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code); - rpcFreeCont(sched->msg); } void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { - mTrace("msg:%s is sent to dnode", taosMsg[msgType]); + mTrace("msg:%d:%s is sent to dnode, ahandle:%p", msgType, taosMsg[msgType], ahandle); if (mgmtSendMsgToDnodeFp) { mgmtSendMsgToDnodeFp(ipSet, msgType, pCont, contLen, ahandle); } else { + if (pCont == NULL) { + pCont = rpcMallocCont(1); + contLen = 0; + } SSchedMsg schedMsg = {0}; schedMsg.fp = mgmtSendMsgToDnodeQueueFp; schedMsg.msg = pCont; @@ -62,10 +65,14 @@ void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t c } void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) { - mTrace("rsp:%s is sent to dnode", taosMsg[msgType]); + mTrace("rsp:%d:%s is sent to dnode", msgType, taosMsg[msgType]); if (mgmtSendRspToDnodeFp) { mgmtSendRspToDnodeFp(pConn, code, pCont, contLen); } else { + if (pCont == NULL) { + pCont = rpcMallocCont(1); + contLen = 0; + } SSchedMsg schedMsg = {0}; schedMsg.fp = mgmtSendMsgToDnodeQueueFp; schedMsg.msg = pCont; @@ -126,16 +133,27 @@ static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe } static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { - mTrace("create table rsp received, handle:%p code:%d", thandle, code); + mTrace("create table rsp received, thandle:%p code:%d", thandle, code); if (thandle == NULL) return; SProcessInfo *info = thandle; - if (info->type == TSDB_PROCESS_CREATE_TABLE) { - rpcSendResponse(info->thandle, code, NULL, 0); + assert(info->type == TSDB_PROCESS_CREATE_TABLE); + STableInfo *pTable = info->ahandle; + + if (code != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId); + mgmtSetTableDirty(pTable, true); } else { - mError("create table rsp received, handle:%p code:%d, invalid type:%d", info->type); - rpcSendResponse(info->thandle, TSDB_CODE_INVALID_MSG, NULL, 0); + mTrace("table:%s, created in dnode", pTable->tableId); + mgmtSetTableDirty(pTable, false); } + + rpcSendResponse(info->thandle, code, NULL, 0); +} + +void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) { + mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle); } static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { @@ -147,89 +165,83 @@ static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contL } static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *ahandle, int32_t code) { - mTrace("create vnode received, vgroup_info:%p code:%d", ahandle, code); - if (ahandle == NULL) { + SProcessInfo *vgprocess = ahandle; + mTrace("create vnode received, vgprocess:%p code:%d", vgprocess, code); + + if (vgprocess == NULL) { rpcFreeCont(pCont); return; } - SProcessInfo *vgroup_info = ahandle; - assert(vgroup_info->type == TSDB_PROCESS_CREATE_VGROUP); - - vgroup_info->received++; - SVgObj *pVgroup = vgroup_info->ahandle; - mTrace("vgroup:%d, received:%d numOfVnodes:%d table_info:%p", - pVgroup->vgId, vgroup_info->received, pVgroup->numOfVnodes, vgroup_info->thandle); - if (vgroup_info->received == pVgroup->numOfVnodes) { - SProcessInfo *table_info = vgroup_info->thandle; - assert(table_info->type == TSDB_PROCESS_CREATE_VGROUP_AND_TABLE); - - STableInfo *pTable = table_info->ahandle; - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_TABLE; - info->thandle = table_info->thandle; - info->ahandle = pTable; + assert(vgprocess->type == TSDB_PROCESS_CREATE_VGROUP); + + vgprocess->received++; + SVgObj *pVgroup = vgprocess->ahandle; + + mTrace("vgroup:%d, received:%d numOfVnodes:%d vgprocess:%p tbprocess::%p", + pVgroup->vgId, vgprocess->received, pVgroup->numOfVnodes, vgprocess->thandle); + if (vgprocess->received == pVgroup->numOfVnodes) { + + STableInfo *pTable = vgprocess->ahandle; + SProcessInfo *tbprocess = calloc(1, sizeof(SProcessInfo)); + tbprocess->type = TSDB_PROCESS_CREATE_TABLE; + tbprocess->thandle = tbprocess->thandle; + tbprocess->ahandle = pTable; SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mgmtSendCreateTableMsg(pTable, &ipSet, info); + mgmtSendCreateTableMsg(pTable, &ipSet, tbprocess); - free(vgroup_info); - free(table_info); + free(vgprocess); } rpcFreeCont(pCont); } -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *table_info) { - SProcessInfo *vgroup_info = calloc(1, sizeof(SProcessInfo)); - vgroup_info->type = TSDB_PROCESS_CREATE_VGROUP; - vgroup_info->thandle = table_info; - vgroup_info->ahandle = pVgroup; - - mTrace("vgroup:%d, send create all vnodes msg, table_info:%p vgroup_info:%p ", pVgroup->vgId, table_info, vgroup_info); +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *table_info) { + mTrace("vgroup:%d, send create all vnodes msg, table_info:%p", pVgroup->vgId, table_info); for (int i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, vgroup_info); + mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, table_info); } } -void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *vgroup_info) { - mTrace("vgroup:%d, send create vnode:%d msg, vgroup_info:%p", pVgroup->vgId, vnode, vgroup_info); - +void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *table_info) { + mTrace("vgroup:%d, send create vnode:%d msg, table_info:%p", pVgroup->vgId, vnode, table_info); SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); if (pVpeer != NULL) { - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), vgroup_info); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), table_info); } } -void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *handle, int32_t code) { +void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { + if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { + mError("invalid msg type:%d", msgType); + return; + } + + mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[msgType], pConn); + if (msgType == TSDB_MSG_TYPE_TABLE_CFG) { - mgmtProcessTableCfgMsg(msgType, pCont, contLen, handle); + mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn); } else if (msgType == TSDB_MSG_TYPE_VNODE_CFG) { - mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, handle); + mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn); } else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP) { - mgmtProcessCreateTableRsp(msgType, pCont, contLen, handle, code); + mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) { - mgmtProcessRemoveTableRsp(msgType, pCont, contLen, handle, code); + mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) { - mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, handle, code); + mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) { - mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, handle, code); + mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { } else { mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]); } -} - -void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { - mTrace("table:%s, sid:%d send create table msg, handle:%p", pTable->tableId, pTable->sid); - SDCreateTableMsg *pCreate = mgmtBuildCreateTableMsg(pTable); - if (pCreate != NULL) { - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle); - } + //rpcFreeCont(pCont); } + void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send remove table msg, handle:%p", pTable->tableId, pTable->sid); diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index a9ab627298b0dcdb8b7fc35e88ec71f00b00c45d..4bff3c1e96e8e42388af266f4c2bff7b8bb96f01 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -319,7 +319,7 @@ int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) { return NULL; } -int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { +int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb); if (numOfTables >= TSDB_MAX_NORMAL_TABLES) { mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES); @@ -373,8 +373,8 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg return TSDB_CODE_SDB_ERROR; } - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", - pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , + pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 8fbf2241b622f9ed9981029e6cf60764a82dd8ca..e9244e9e234b0c173ffd57871d69e13e82581177 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -51,9 +51,9 @@ void *tsShellConnServer = NULL; static void mgmtInitProcessShellMsg(); static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code); -static int32_t (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, void *ahandle); -static int32_t mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); -static int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); +static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, void *ahandle); +static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); +static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); void mgmtProcessTranRequest(SSchedMsg *sched) { int8_t msgType = *(int8_t *) (sched->msg); @@ -117,7 +117,7 @@ void mgmtCleanUpShell() { } } -int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { +void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { SRpcConnInfo connInfo; rpcGetConnInfo(ahandle, &connInfo); @@ -125,7 +125,7 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { SUserObj *pUser = mgmtGetUser(connInfo.user); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } STableInfoMsg *pInfo = pCont; @@ -134,7 +134,7 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { SDbObj* pDb = mgmtGetDbByTableId(pInfo->tableId); if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0); - return TSDB_CODE_INVALID_DB; + return; } STableInfo *pTable = mgmtGetTable(pInfo->tableId); @@ -142,38 +142,33 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { // on demand create table from super table if meter does not exists if (pTable == NULL && pInfo->createFlag == 1) { // write operation needs to redirect to master mnode - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData)); if (pCreateMsg == NULL) { rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + return; } memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); strcpy(pCreateMsg->tableId, pInfo->tableId); - int32_t code = mgmtCreateTable(pDb, pCreateMsg, NULL); + mgmtCreateTable(pCreateMsg, contLen, NULL); char stableName[TSDB_TABLE_ID_LEN] = {0}; strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN); - mTrace("table:%s is auto created by %s from %s, code:%d", pCreateMsg->tableId, pUser->user, stableName, code); + mTrace("table:%s is auto created by %s from %s", pCreateMsg->tableId, pUser->user, stableName); tfree(pCreateMsg); - if (code != TSDB_CODE_SUCCESS) { - rpcSendResponse(ahandle, code, NULL, 0); - return code; - } - pTable = mgmtGetTable(pInfo->tableId); } if (pTable == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0); - return TSDB_CODE_INVALID_TABLE; + return; } STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS); @@ -186,11 +181,9 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { pMeta->contLen = htons(pMeta->contLen); rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen); } - - return TSDB_CODE_SUCCESS; } -int32_t mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { +void mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { SRpcConnInfo connInfo; rpcGetConnInfo(ahandle, &connInfo); @@ -198,7 +191,7 @@ int32_t mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle SUserObj *pUser = mgmtGetUser(connInfo.user); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SMultiTableInfoMsg *pInfo = pCont; @@ -208,7 +201,7 @@ int32_t mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen); if (pMultiMeta == NULL) { rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + return; } pMultiMeta->contLen = sizeof(SMultiTableMeta); @@ -245,10 +238,9 @@ int32_t mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle } rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMultiMeta, pMultiMeta->contLen); - return TSDB_CODE_SUCCESS; } -int32_t mgmtProcessSuperTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { +void mgmtProcessSuperTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { SRpcConnInfo connInfo; rpcGetConnInfo(ahandle, &connInfo); @@ -258,29 +250,27 @@ int32_t mgmtProcessSuperTableMetaMsg(void *pCont, int32_t contLen, void *ahandle STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); if (pTable == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0); - return TSDB_CODE_INVALID_TABLE; + return; } SSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); if (pRsp != NULL) { int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pRsp, msgLen); - return TSDB_CODE_SUCCESS; } else { rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0); - return TSDB_CODE_SUCCESS; } } -int32_t mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SCreateDbMsg *pCreate = (SCreateDbMsg *) pCont; @@ -309,18 +299,17 @@ int32_t mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SAlterDbMsg *pAlter = (SAlterDbMsg *) pCont; @@ -339,18 +328,17 @@ int32_t mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessKillQueryMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessKillQueryMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SKillQueryMsg *pKill = (SKillQueryMsg *) pCont; @@ -363,18 +351,17 @@ int32_t mgmtProcessKillQueryMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessKillStreamMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessKillStreamMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SKillStreamMsg *pKill = (SKillStreamMsg *) pCont; @@ -387,18 +374,17 @@ int32_t mgmtProcessKillStreamMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SKillConnectionMsg *pKill = (SKillConnectionMsg *) pCont; @@ -411,18 +397,17 @@ int32_t mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle } rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } int32_t code; @@ -437,30 +422,29 @@ int32_t mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pOperUser = mgmtGetUserFromConn(ahandle); if (pOperUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SAlterUserMsg *pAlter = pCont; SUserObj *pUser = mgmtGetUser(pAlter->user); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); - return TSDB_CODE_NO_RIGHTS; + return; } int code; @@ -490,7 +474,7 @@ int32_t mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; + return; } if ((pAlter->flag & TSDB_ALTER_USER_PRIVILEGES) != 0) { @@ -539,35 +523,34 @@ int32_t mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; + return; } code = TSDB_CODE_NO_RIGHTS; rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return ; } SUserObj *pOperUser = mgmtGetUserFromConn(ahandle); if (pOperUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return ; } SDropUserMsg *pDrop = pCont; SUserObj *pUser = mgmtGetUser(pDrop->user); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return ; } if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); - return TSDB_CODE_NO_RIGHTS; + return ; } bool hasRight = false; @@ -598,18 +581,17 @@ int32_t mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return ; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return ; } int32_t code; @@ -624,7 +606,6 @@ int32_t mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; } static void mgmtInitShowMsgFp() { @@ -665,11 +646,11 @@ static void mgmtInitShowMsgFp() { mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes; } -int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { +void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { SShowMsg *pShowMsg = pCont; if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } } @@ -677,7 +658,7 @@ int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { SShowRsp *pShowRsp = rpcMallocCont(size); if (pShowRsp == NULL) { rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + return; } int32_t code; @@ -707,10 +688,9 @@ int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, pShowRsp, size); - return code; } -int32_t mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { +void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { int32_t rowsToRead = 0; int32_t size = 0; int32_t rowsRead = 0; @@ -724,14 +704,14 @@ int32_t mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { if (!mgmtCheckQhandle(pRetrieve->qhandle)) { mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle); rpcSendResponse(ahandle, TSDB_CODE_INVALID_QHANDLE, NULL, 0); - return TSDB_CODE_INVALID_QHANDLE; + return; } SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; if (pShow->signature != (void *)pShow) { mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); rpcSendResponse(ahandle, TSDB_CODE_MEMORY_CORRUPTED, NULL, 0); - return TSDB_CODE_MEMORY_CORRUPTED; + return; } else { if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { rowsToRead = pShow->numOfRows - pShow->numOfReads; @@ -758,7 +738,7 @@ int32_t mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { if (rowsRead < 0) { rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS; rpcFreeCont(pRsp); - return TSDB_CODE_ACTION_IN_PROGRESS; + return; } pRsp->numOfRows = htonl(rowsRead); @@ -769,61 +749,51 @@ int32_t mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { if (rowsToRead == 0) { mgmtFreeQhandle(pShow); } - - return TSDB_CODE_SUCCESS; } -int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { + SCreateTableMsg *pCreate = (SCreateTableMsg *) pCont; + pCreate->numOfColumns = htons(pCreate->numOfColumns); + pCreate->numOfTags = htons(pCreate->numOfTags); + pCreate->sqlLen = htons(pCreate->sqlLen); + + SSchema *pSchema = pCreate->schema; + for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { + pSchema->bytes = htons(pSchema->bytes); + pSchema->colId = i; + pSchema++; + } + + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to create table, need redirect message", pCreate->tableId); + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { + mError("table:%s, failed to create table, invalid user", pCreate->tableId); rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } - SCreateTableMsg *pCreate = (SCreateTableMsg *) pCont; - SSchema *pSchema; - int32_t code; - if (!pUser->writeAuth) { - code = TSDB_CODE_NO_RIGHTS; - } else { - pCreate->numOfColumns = htons(pCreate->numOfColumns); - pCreate->numOfTags = htons(pCreate->numOfTags); - pCreate->sqlLen = htons(pCreate->sqlLen); - pSchema = pCreate->schema; - for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { - pSchema->bytes = htons(pSchema->bytes); - pSchema->colId = i; - pSchema++; - } - - SDbObj *pDb = mgmtGetDb(pCreate->db); - if (pDb) { - code = mgmtCreateTable(pDb, pCreate, ahandle); - } else { - code = TSDB_CODE_DB_NOT_SELECTED; - } + mError("table:%s, failed to create table, no rights", pCreate->tableId); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; } - if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - rpcSendResponse(ahandle, code, NULL, 0); - } - return code; + mgmtCreateTable(pCreate, contLen, ahandle); } -int32_t mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SDropTableMsg *pDrop = (SDropTableMsg *) pCont; @@ -846,18 +816,17 @@ int32_t mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) { if (code != TSDB_CODE_SUCCESS) { rpcSendResponse(ahandle, code, NULL, 0); } - return code; } -int32_t mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SAlterTableMsg *pAlter = (SAlterTableMsg *) pCont; @@ -892,18 +861,17 @@ int32_t mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) { if (code != TSDB_CODE_SUCCESS) { rpcSendResponse(ahandle, code, NULL, 0); } - return code; } -int32_t mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != 0) { - return TSDB_CODE_REDIRECT; +void mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + return; } SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; @@ -920,10 +888,9 @@ int32_t mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { } rpcSendResponse(ahandle, code, NULL, 0); - return code; } -int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { +void mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { SHeartBeatMsg *pHBMsg = (SHeartBeatMsg *) pCont; mgmtSaveQueryStreamList(pHBMsg); @@ -931,7 +898,7 @@ int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { if (pHBRsp == NULL) { rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); rpcFreeCont(pCont); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + return; } SRpcConnInfo connInfo; @@ -962,10 +929,9 @@ int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { pHBRsp->killConnection = 0; rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pHBRsp, sizeof(SHeartBeatMsg)); - return TSDB_CODE_SUCCESS; } -int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { +int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { *spi = 0; *encrypt = 0; *ckey = 0; @@ -980,7 +946,7 @@ int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secr } } -static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) { +static void mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) { SConnectMsg *pConnectMsg = (SConnectMsg *) pCont; SRpcConnInfo connInfo; rpcGetConnInfo(thandle, &connInfo); @@ -1052,8 +1018,6 @@ connect_over: mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); rpcSendResponse(thandle, code, pConnectRsp, sizeof(SConnectRsp)); } - - return code; } /** @@ -1145,15 +1109,14 @@ static int32_t mgmtCheckRedirectMsgImp(void *pConn) { int32_t (*mgmtCheckRedirectMsg)(void *pConn) = mgmtCheckRedirectMsgImp; -static int32_t mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) { +static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) { rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); - return TSDB_CODE_OPS_NOT_SUPPORT; } -int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -int32_t (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -int32_t (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -int32_t (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -int32_t (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; \ No newline at end of file +void (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; +void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; +void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; +void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; +void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; +void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; +void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; \ No newline at end of file diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 6472c8737d8d6786cf01bbe1bd9240cc9afda8ae..9a8a97d28f725debf1724c3326cf36bc810cb832 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -111,67 +111,135 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo return TSDB_CODE_SUCCESS; } -int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle) { +void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { + SDbObj *pDb = mgmtGetDb(pCreate->db); + if (pDb == NULL) { + mError("table:%s, failed to create vgroup, db not found", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_INVALID_DB, NULL, 0); + return; + } + + SVgObj *pVgroup = mgmtCreateVgroup(pDb); + if (pVgroup == NULL) { + mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_NO_ENOUGH_DNODES, NULL, 0); + return; + } + + void *cont = rpcMallocCont(contLen); + if (cont == NULL) { + mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + return; + } + + memcpy(cont, pCreate, contLen); + + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_VGROUP; + info->thandle = thandle; + info->ahandle = pVgroup; + info->cont = cont; + info->contLen = contLen; + + mgmtSendCreateVgroupMsg(pVgroup, info); +} + +void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { + assert(pVgroup != NULL); + + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid < 0) { + mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId); + mgmtProcessCreateVgroup(pCreate, contLen, thandle); + return; + } + + int32_t code; + STableInfo *pTable; + SDCreateTableMsg *pDCreate = NULL; + + if (pCreate->numOfColumns == 0) { + mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); + code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + } else { + mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); + code = mgmtCreateNormalTable(pCreate, pVgroup, sid, &pDCreate, &pTable); + } + + if (code != TSDB_CODE_SUCCESS) { + mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid); + rpcSendResponse(thandle, code, NULL, 0); + return; + } + + assert(pDCreate != NULL); + assert(pTable != NULL); + + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_TABLE; + info->ahandle = pTable; + info->thandle = thandle; + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + + mgmtSendCreateTableMsg(pDCreate, &ipSet, info); +} + +void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { + SDbObj *pDb = mgmtGetDb(pCreate->db); + if (pDb == NULL) { + mError("table:%s, failed to create table, db not selected", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0); + return; + } + STableInfo *pTable = mgmtGetTable(pCreate->tableId); if (pTable != NULL) { if (pCreate->igExists) { - return TSDB_CODE_SUCCESS; + mTrace("table:%s, table is alredy exist, think it success", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0); } else { - return TSDB_CODE_TABLE_ALREADY_EXIST; + mError("table:%s, failed to create table, table already exist", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_TABLE_ALREADY_EXIST, NULL, 0); } + return; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); assert(pAcct != NULL); + int32_t code = mgmtCheckTableLimit(pAcct, pCreate); - if (code != 0) { - mError("table:%s, exceed the limit", pCreate->tableId); - return code; + if (code != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId); + rpcSendResponse(thandle, code, NULL, 0); + return; } if (mgmtCheckExpired()) { - mError("failed to create meter:%s, reason:grant expired", pCreate->tableId); - return TSDB_CODE_GRANT_EXPIRED; + mError("table:%s, failed to create table, grant expired", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_GRANT_EXPIRED, NULL, 0); + return; } - if (pCreate->numOfTags == 0) { - int32_t grantCode = mgmtCheckTimeSeries(pCreate->numOfColumns); - if (grantCode != 0) { - mError("table:%s, grant expired", pCreate->tableId); - return grantCode; - } + if (pCreate->numOfTags != 0) { + mTrace("table:%s, start to create super table, tags:%d columns:%d", pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); + mgmtCreateSuperTable(pDb, pCreate); + return; + } - int32_t sid; - SVgObj *pVgroup = mgmtGetAvailVgroup(pDb, &sid); - if (pVgroup == NULL) { - code = mgmtCreateVgroup(pDb); - if (code == TSDB_CODE_SUCCESS) { - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_VGROUP_AND_TABLE; - info->ahandle = pTable; - info->thandle = thandle; - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mgmtSendCreateVgroupMsg(pVgroup, &ipSet, info); - return TSDB_CODE_ACTION_IN_PROGRESS; - } - } else { - if (pCreate->numOfColumns == 0) { - code = mgmtCreateChildTable(pDb, pCreate, pVgroup, sid); - } else { - code = mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid); - } - if (code == TSDB_CODE_SUCCESS) { - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_TABLE; - info->ahandle = pTable; - info->thandle = thandle; - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mgmtSendCreateTableMsg(pTable, &ipSet, info); - return TSDB_CODE_ACTION_IN_PROGRESS; - } - } + code = mgmtCheckTimeSeries(pCreate->numOfColumns); + if (code != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to create table, timeseries exceed the limit", pCreate->tableId); + return; + } + + SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb); + if (pVgroup == NULL) { + mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId, pVgroup->vgId); + mgmtProcessCreateVgroup(pCreate, contLen, thandle); } else { - return mgmtCreateSuperTable(pDb, pCreate); + mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId); + mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle); } } @@ -409,30 +477,13 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * return numOfRows; } -SDCreateTableMsg *mgmtBuildCreateTableMsg(STableInfo *pTable) { - SDCreateTableMsg *pCreate = NULL; - if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { - pCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); - } else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { - pCreate = mgmtBuildCreateChildTableMsg((SChildTableObj *) pTable); - } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) { - pCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); - } else { - } - - return pCreate; -} - SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) { SDRemoveTableMsg *pRemove = NULL; - if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { - pRemove = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); - } else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { - pRemove = mgmtBuildCreateChildTableMsg((SChildTableObj *) pTable); - } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) { - pRemove = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); - } else { - } + return pRemove; +} + +void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) { + pTable->dirty = isDirty; } \ No newline at end of file diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 79792ee20e68449c02f7c69bbcf82c0640207db1..657706be547b8569cd9e5ee7936cfc2fc9e72ed0 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -116,65 +116,25 @@ SVgObj *mgmtGetVgroup(int32_t vgId) { return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); } -//TODO: temp way for debug usage -SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid) { - SVgObj *pVgroup = pDb->pHead; - - if (pVgroup == NULL) { - pVgroup = mgmtCreateVgroup(pDb); +int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) { + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid < 0) { + mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool)); + pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; + mgmtCreateVgroup(pDb); + terrno = TSDB_CODE_ACTION_IN_PROGRESS; } - *sid = taosAllocateId(pVgroup->idPool); - return pVgroup; - -// if (pDb->vgStatus) -// -// if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) { -// terrno = TSDB_CODE_ACTION_IN_PROGRESS; -// return NULL; -// } -// -// if (pDb->vgStatus == TSDB_VG_STATUS_FULL) { -// mError("db:%s, vgroup is full", pDb->name); -// terrno = TSDB_CODE_NO_ENOUGH_DNODES; -// return NULL; -// } -// -// if (pDb->vgStatus == TSDB_VG_STATUS_NO_DISK_PERMISSIONS || -// pDb->vgStatus == TSDB_VG_STATUS_SERVER_NO_PACE || -// pDb->vgStatus == TSDB_VG_STATUS_SERV_OUT_OF_MEMORY || -// pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED ) { -// mError("db:%s, vgroup init failed, reason:%d %s", pDb->name, pDb->vgStatus, taosGetVgroupStatusStr(pDb->vgStatus)); -// terrno = pDb->vgStatus; -// return NULL; -// } -// -// if (pVgroup == NULL) { -// //TODO -// //pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; -// pDb->vgStatus = TSDB_VG_STATUS_READY; -// mgmtCreateVgroup(pDb); -// mTrace("db:%s, vgroup malloced, wait for create progress finished", pDb->name); -// terrno = TSDB_CODE_ACTION_IN_PROGRESS; -// return NULL; -// } -// -// terrno = 0; -// return pVgroup; + terrno = 0; + return sid; } -//int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) { -// int32_t sid = taosAllocateId(pVgroup->idPool); -// if (sid < 0) { -// mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool)); -// pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; -// mgmtCreateVgroup(pDb); -// terrno = TSDB_CODE_ACTION_IN_PROGRESS; -// } -// -// terrno = 0; -// return sid; -//} +/* + * TODO: check if there is enough sids + */ +SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { + return pDb->pHead; +} void mgmtProcessVgTimer(void *handle, void *tmrId) { SDbObj *pDb = (SDbObj *)handle;