diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 7be1bd6e30e36ea072edf287e25cf83bb0cf0ade..d8220fe846350665f90782e0ae65004d815b9dd2 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -60,7 +60,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { if (dnodeProcessMgmtRspFp[pMsg->msgType]) { (*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); } else { - dError("%s is not processed", taosMsg[pMsg->msgType]); + dError("%s is not processed in mclient", taosMsg[pMsg->msgType]); } rpcFreeCont(pMsg->pCont); diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeMnode.c index 6e75ddc68ef197e5a12dda4b964d3e7cfb6e0ec6..971e7b37f518d0b3cbe4b4c6df72579e996a9461 100644 --- a/src/dnode/src/dnodeMnode.c +++ b/src/dnode/src/dnodeMnode.c @@ -81,7 +81,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { - dError("%s is not processed", taosMsg[pMsg->msgType]); + dError("%s is not processed in mserver", taosMsg[pMsg->msgType]); rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index a0a0a1b5fd0c88227bb0b0fc9872e1ebad27f70c..0aca6a46511f6721717a9a2d6c2625feeba14f55 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -84,27 +84,33 @@ void dnodeCleanupWrite() { } void dnodeWrite(SRpcMsg *pMsg) { + int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - int32_t contLen = 0; - int32_t numOfVnodes = 0; - int32_t vgId = 0; SRpcContext *pRpcContext = NULL; - // parse head, get number of vnodes; + int32_t numOfVnodes = 0; + if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { + // TODO parse head, get number of vnodes; + numOfVnodes = 1; + } else { + numOfVnodes = 1; + } - if ( numOfVnodes > 1) { + if (numOfVnodes > 1) { pRpcContext = calloc(sizeof(SRpcContext), 1); pRpcContext->numOfVnodes = numOfVnodes; } while (leftLen > 0) { - // todo: parse head, get vgId, contLen + SWriteMsgHead *pHead = (SWriteMsgHead *) pCont; + int32_t vgId = htonl(pHead->vgId); + int32_t contLen = htonl(pHead->contLen); - // get pVnode from vgId void *pVnode = dnodeGetVnode(vgId); if (pVnode == NULL) { - + leftLen -= contLen; + pCont -= contLen; continue; } @@ -118,10 +124,22 @@ void dnodeWrite(SRpcMsg *pMsg) { taos_queue queue = dnodeGetVnodeWworker(pVnode); taosWriteQitem(queue, &writeMsg); - + // next vnode leftLen -= contLen; - pCont -= contLen; + pCont -= contLen; + queuedMsgNum++; + } + + if (queuedMsgNum == 0) { + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .pCont = NULL, + .contLen = 0, + .code = TSDB_CODE_INVALID_VGROUP_ID, + .msgType = 0 + }; + rpcSendResponse(&rpcRsp); } } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 225c25ff08c4c9533ed92cf89001b7f88f5c661e..34ef1eb67ca34c3f4977e8224e11ab0cd7f41a1b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -244,6 +244,14 @@ typedef struct { } SVnodeDesc; typedef struct { + int32_t contLen; + int32_t vgId; +} SWriteMsgHead; + +typedef struct { + int32_t contLen; + int32_t vgId; + int8_t tableType; int16_t numOfColumns; int16_t numOfTags; @@ -251,7 +259,6 @@ typedef struct { int32_t sversion; int32_t tagDataLen; int32_t sqlDataLen; - int32_t contLen; int32_t numOfVPeers; uint64_t uid; uint64_t superTableUid; @@ -337,6 +344,7 @@ typedef struct { } SMgmtHead; typedef struct { + int32_t vgId; int32_t sid; int32_t numOfVPeers; uint64_t uid; diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 0c7518eef73d32844aecd7b10846465c9edabe41..b4854f4d565a6f3c1d3bddde193c1d05afad9cdf 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pTableOut = (STableInfo *) pTable; - mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , + mTrace("table:%s, create ctable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); return TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mgmtDClient.c b/src/mnode/src/mgmtDClient.c index 59b2402b8cf045c8e72078d9dd6c3bdd358c6576..22884cc7d005c0d595d1bf0229c4e9b1e8f373b6 100644 --- a/src/mnode/src/mgmtDClient.c +++ b/src/mnode/src/mgmtDClient.c @@ -78,7 +78,7 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) { (*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg); } else { - dError("%s is not processed", taosMsg[rpcMsg->msgType]); + mError("%s is not processed in dclient", taosMsg[rpcMsg->msgType]); } rpcFreeCont(rpcMsg->pCont); diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index faf6c480c3437eeed7f830ec09ea1286c63c78d5..98a2e387285889bbcf5fdcf94e1e700c7fa3a6c7 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -76,7 +76,7 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) { if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) { (*mgmtProcessDnodeMsgFp[rpcMsg->msgType])(rpcMsg); } else { - mError("%s is not processed", taosMsg[rpcMsg->msgType]); + mError("%s is not processed in dserver", taosMsg[rpcMsg->msgType]); } rpcFreeCont(rpcMsg->pCont); diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index d6623ee7009a8be93d0af69642fa6d04c74758cf..d32452ae1384fc433fe8b558e5ece9bc01e875db 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -393,7 +393,7 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb *pTableOut = (STableInfo *) pTable; - mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , + mTrace("table:%s, create ntable in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); return TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 058173b13f889ebe159f46d5f9e6526077491863..3898d62fdbbf6b31bbc30b5dfbc27e8478cf5578 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -145,7 +145,6 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - mError("thandle:%p, failed to retrieve user info", rpcMsg->handle); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); rpcFreeCont(rpcMsg->pCont); return; @@ -242,7 +241,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { } SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; - if (!mgmtCheckQhandle(pShow)) { + if (!mgmtCheckQhandle(pRetrieve->qhandle)) { mError("pShow:%p, query memory is corrupted", pShow); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED); return; @@ -466,7 +465,7 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { } static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { - mError("%s is not processed", taosMsg[rpcMsg->msgType]); + mError("%s is not processed in shell", taosMsg[rpcMsg->msgType]); SRpcMsg rpcRsp = { .msgType = 0, .pCont = 0, diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index b1cdbef0b3fe02c6ba9a9ffeba2a3e955af3d8d6..ee0f19b89bc3e1474acc3d76d2018f65f77ca8b5 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -181,6 +181,8 @@ static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { .code = 0, .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; + + pMsg->ahandle = pTable; mgmtSendMsgToDnode(&ipSet, &rpcMsg); } @@ -441,14 +443,11 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) { } void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; + SCMCreateTableMsg *pCreate = pMsg->pCont; mTrace("thandle:%p, start to create table:%s", pMsg->thandle, pCreate->tableId); - if (mgmtCheckRedirect(pMsg->thandle)) { - mError("thandle:%p, failed to create table:%s, need redirect", pMsg->thandle, pCreate->tableId); - return; - } - if (mgmtCheckExpired()) { mError("thandle:%p, failed to create table:%s, grant expired", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); @@ -469,8 +468,8 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { return; } - SDbObj *pDb = mgmtGetDb(pCreate->db); - if (pDb == NULL) { + pMsg->pDb = mgmtGetDb(pCreate->db); + if (pMsg->pDb == NULL) { mError("thandle:%p, failed to create table:%s, db not selected", pMsg->thandle, pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; @@ -492,7 +491,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (pCreate->numOfTags != 0) { mTrace("thandle:%p, start to create super table:%s, tags:%d columns:%d", pMsg->thandle, pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); - code = mgmtCreateSuperTable(pDb, pCreate); + code = mgmtCreateSuperTable(pMsg->pDb, pCreate); mgmtSendSimpleResp(pMsg->thandle, code); return; } @@ -508,13 +507,13 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); pMsg->pCont = NULL; - SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb); + SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); if (pVgroup == NULL) { - mTrace("thandle:%p, table:%s start to create a new vgroup", pMsg->thandle, pCreate->tableId); - mgmtCreateVgroup(pMsg); + mTrace("thandle:%p, table:%s start to create a new vgroup", newMsg->thandle, pCreate->tableId); + mgmtCreateVgroup(newMsg); } else { - mTrace("thandle:%p, create table:%s in vgroup:%d", pMsg->thandle, pCreate->tableId, pVgroup->vgId); - mgmtCreateTable(pVgroup, pMsg); + mTrace("thandle:%p, create table:%s in vgroup:%d", newMsg->thandle, pCreate->tableId, pVgroup->vgId); + mgmtCreateTable(pVgroup, newMsg); } } @@ -769,7 +768,7 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { mgmtSetTableDirty(pTable, true); //sdbDeleteRow(tsVgroupSdb, pVgroup); mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); - mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId, rpcMsg->code); + mError("table:%s, failed to create in dnode, reason:%s, set it dirty", pTable->tableId, tstrerror(rpcMsg->code)); mgmtSetTableDirty(pTable, true); } else { mTrace("table:%s, created in dnode", pTable->tableId); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 8e3dd78ab90252c5e67fac95d234b37356b91252..31e721d22c762c9819644f1cc0c7ca92f7d28ef3 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -182,6 +182,8 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); } + pMsg->ahandle = pVgroup; + pMsg->expected = pVgroup->numOfVnodes; mgmtSendCreateVgroupMsg(pVgroup, pMsg); } @@ -561,7 +563,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { SRpcIpSet ipSet = { .numOfIps = pVgroup->numOfVnodes, .inUse = 0, - .port = tsMnodeDnodePort + .port = tsDnodeMnodePort }; for (int i = 0; i < pVgroup->numOfVnodes; ++i) { ipSet.ip[i] = pVgroup->vnodeGid[i].ip; @@ -574,7 +576,7 @@ SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { .ip[0] = ip, .numOfIps = 1, .inUse = 0, - .port = tsMnodeDnodePort + .port = tsDnodeMnodePort }; return ipSet; }