diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index 18596a9e182fede027dac76ea2b9eb6ca866e902..ac8a10067d305c2f9b658f3d9c29551b2e608560 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -135,9 +135,8 @@ typedef struct SQueryStmtInfo { SArray *pUdfInfo; struct SQueryStmtInfo *sibling; // sibling - struct SQueryStmtInfo *pDownstream; SMultiFunctionsDesc info; - SArray *pUpstream; // SArray + SArray *pDownstream; // SArray int32_t havingFieldNum; int32_t exprListLevelIndex; } SQueryStmtInfo; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 4951ffa512b529408ee66a290ef02c7ebb6ab9a0..73c9fc5e9f12b1cdcec189c07efc7c127bb06df7 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -48,7 +48,7 @@ int main(int argc, char** argv) { } TEST(testCase, driverInit_Test) { taos_init(); } -#if 0 + TEST(testCase, connect_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -551,7 +551,7 @@ TEST(testCase, generated_request_id_test) { // taos_free_result(pRes); // taos_close(pConn); //} -#endif +//#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 3e9258029e01fb24138099a98bb5d63fe4d3a989..ae37967b3db960ca6005d03aba3a084b70d657b1 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -502,7 +502,7 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode != NULL) { + if (pMnode == NULL) { terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; dError("failed to drop mnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/impl/test/mnode/qmnode.cpp b/source/dnode/mgmt/impl/test/mnode/dmnode.cpp similarity index 50% rename from source/dnode/mgmt/impl/test/mnode/qmnode.cpp rename to source/dnode/mgmt/impl/test/mnode/dmnode.cpp index 53b40c801ba032ceda72493cbd3ccecfbcaf40ae..841d00d14d24cfb7174f9c5f24fa49ba3b209d0a 100644 --- a/source/dnode/mgmt/impl/test/mnode/qmnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/dmnode.cpp @@ -13,20 +13,10 @@ class DndTestMnode : public ::testing::Test { protected: - static void SetUpTestSuite() { - test.Init("/tmp/dnode_test_mnode", 9113); - const char* fqdn = "localhost"; - const char* firstEp = "localhost:9113"; - - server2.Start("/tmp/dnode_test_mnode2", fqdn, 9114, firstEp); - } - static void TearDownTestSuite() { - server2.Stop(); - test.Cleanup(); - } + static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9113); } + static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; - static TestServer server2; public: void SetUp() override {} @@ -34,24 +24,6 @@ class DndTestMnode : public ::testing::Test { }; Testbase DndTestMnode::test; -TestServer DndTestMnode::server2; - -TEST_F(DndTestMnode, 01_Create_Dnode) { - int32_t contLen = sizeof(SCreateDnodeReq); - - SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); - strcpy(pReq->fqdn, "localhost"); - pReq->port = htonl(9114); - - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); - - taosMsleep(1300); - test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, ""); - test.SendShowRetrieveReq(); - EXPECT_EQ(test.GetShowRows(), 2); -} TEST_F(DndTestMnode, 01_Create_Mnode) { { @@ -101,100 +73,55 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); } - - // { - // int32_t contLen = sizeof(SDCreateMnodeReq); - - // SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); - // pReq->dnodeId = htonl(1); - // pReq->replica = 2; - // pReq->replicas[0].id = htonl(1); - // pReq->replicas[0].port = htonl(9113); - // pReq->replicas[0].id = htonl(1); - // pReq->replicas[0].port = htonl(9113); - // strcpy(pReq->replicas[0].fqdn, "localhost"); - - // SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); - // ASSERT_NE(pRsp, nullptr); - // ASSERT_EQ(pRsp->code, 0); - // } - - // { - // int32_t contLen = sizeof(SDCreateMnodeReq); - - // SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); - // pReq->dnodeId = htonl(1); - - // SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); - // ASSERT_NE(pRsp, nullptr); - // ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); - // } - - // test.Restart(); - - // { - // int32_t contLen = sizeof(SDCreateMnodeReq); - - // SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); - // pReq->dnodeId = htonl(1); - - // SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); - // ASSERT_NE(pRsp, nullptr); - // ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); - // } } - -#if 0 TEST_F(DndTestMnode, 02_Alter_Mnode) { - { - int32_t contLen = sizeof(SDCreateMnodeReq); + { + int32_t contLen = sizeof(SDAlterMnodeReq); - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); + pReq->replica = 1; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); - } - - { - int32_t contLen = sizeof(SDCreateMnodeReq); - - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(1); - - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); } { - int32_t contLen = sizeof(SDCreateMnodeReq); + int32_t contLen = sizeof(SDAlterMnodeReq); - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); + pReq->replica = 1; + pReq->replicas[0].id = htonl(2); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); } - test.Restart(); - { - int32_t contLen = sizeof(SDCreateMnodeReq); + int32_t contLen = sizeof(SDAlterMnodeReq); - SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); + pReq->replica = 1; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + ASSERT_EQ(pRsp->code, 0); } } - TEST_F(DndTestMnode, 03_Drop_Mnode) { { int32_t contLen = sizeof(SDDropMnodeReq); @@ -204,7 +131,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); } { @@ -229,28 +156,34 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); } - test.Restart(); - { - int32_t contLen = sizeof(SDDropMnodeReq); + int32_t contLen = sizeof(SDAlterMnodeReq); - SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); + pReq->replica = 1; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); - SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); } + { int32_t contLen = sizeof(SDCreateMnodeReq); SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); + pReq->replica = 2; + pReq->replicas[0].id = htonl(1); + pReq->replicas[0].port = htonl(9113); + strcpy(pReq->replicas[0].fqdn, "localhost"); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); } -} -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index e1add0a8054df67c6806c2fa407dffbb497d257c..c14d1f51f866f6a70ef0f8391d38ffe151e24984 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -27,14 +27,14 @@ static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj); static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw); static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj); -static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode); -static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg); -static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg); -static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg); -static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); -static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew); +static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq); +static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq); +static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp); +static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp); +static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp); +static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); int32_t mndInitMnode(SMnode *pMnode) { @@ -65,7 +65,7 @@ void mndCleanupMnode(SMnode *pMnode) {} static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) { SSdb *pSdb = pMnode->pSdb; SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &mnodeId); - if (pObj == NULL) { + if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_MNODE_NOT_EXIST; } return pObj; @@ -207,9 +207,9 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { return 0; } -static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode) { - mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOldMnode->id, pOldMnode, pNewMnode); - pOldMnode->updateTime = pNewMnode->updateTime; +static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) { + mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + pOld->updateTime = pNew->updateTime; return 0; } @@ -277,13 +277,13 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno void *pIter = NULL; int32_t numOfReplicas = 0; - SDCreateMnodeReq createMsg = {0}; + SDCreateMnodeReq createReq = {0}; while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); if (pIter == NULL) break; - SReplica *pReplica = &createMsg.replicas[numOfReplicas]; + SReplica *pReplica = &createReq.replicas[numOfReplicas]; pReplica->id = htonl(pMObj->id); pReplica->port = htons(pMObj->pDnode->port); memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); @@ -292,13 +292,13 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno sdbRelease(pSdb, pMObj); } - SReplica *pReplica = &createMsg.replicas[numOfReplicas]; + SReplica *pReplica = &createReq.replicas[numOfReplicas]; pReplica->id = htonl(pDnode->id); pReplica->port = htons(pDnode->port); memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); numOfReplicas++; - createMsg.replica = numOfReplicas; + createReq.replica = numOfReplicas; while (1) { SMnodeObj *pMObj = NULL; @@ -307,22 +307,23 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno STransAction action = {0}; - SDAlterMnodeReq *pMsg = malloc(sizeof(SDAlterMnodeReq)); - if (pMsg == NULL) { + SDAlterMnodeReq *pReq = malloc(sizeof(SDAlterMnodeReq)); + if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; } - memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeReq)); + memcpy(pReq, &createReq, sizeof(SDAlterMnodeReq)); - pMsg->dnodeId = htonl(pMObj->id); + pReq->dnodeId = htonl(pMObj->id); action.epSet = mndGetDnodeEpset(pMObj->pDnode); - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SDAlterMnodeReq); action.msgType = TDMT_DND_ALTER_MNODE; + action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; @@ -335,17 +336,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); - SDCreateMnodeReq *pMsg = malloc(sizeof(SDCreateMnodeReq)); - if (pMsg == NULL) return -1; - memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeReq)); - pMsg->dnodeId = htonl(pObj->id); + SDCreateMnodeReq *pReq = malloc(sizeof(SDCreateMnodeReq)); + if (pReq == NULL) return -1; + memcpy(pReq, &createReq, sizeof(SDAlterMnodeReq)); + pReq->dnodeId = htonl(pObj->id); action.epSet = mndGetDnodeEpset(pDnode); - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SDCreateMnodeReq); action.msgType = TDMT_DND_CREATE_MNODE; + action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); return -1; } } @@ -353,39 +355,23 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno return 0; } -static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { +static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { + int32_t code = -1; + SMnodeObj mnodeObj = {0}; mnodeObj.id = pDnode->id; mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.updateTime = mnodeObj.createdTime; - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); - goto CREATE_MNODE_OVER; - } - mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + if (pTrans == NULL) goto CREATE_MNODE_OVER; - if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_MNODE_OVER; - } - - if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_MNODE_OVER; - } - - if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_MNODE_OVER; - } + mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); + if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto CREATE_MNODE_OVER; + if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto CREATE_MNODE_OVER; + if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto CREATE_MNODE_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto CREATE_MNODE_OVER; - } + if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_MNODE_OVER; code = 0; @@ -394,9 +380,9 @@ CREATE_MNODE_OVER: return code; } -static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SMCreateMnodeReq *pCreate = pMsg->rpcMsg.pCont; +static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMCreateMnodeReq *pCreate = pReq->rpcMsg.pCont; pCreate->dnodeId = htonl(pCreate->dnodeId); @@ -408,6 +394,9 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { mError("mnode:%d, mnode already exist", pObj->id); terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; return -1; + } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) { + mError("qnode:%d, failed to create mnode since %s", pCreate->dnodeId, terrstr()); + return -1; } SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); @@ -417,7 +406,7 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { return -1; } - int32_t code = mndCreateMnode(pMnode, pMsg, pDnode, pCreate); + int32_t code = mndCreateMnode(pMnode, pReq, pDnode, pCreate); mndReleaseDnode(pMnode, pDnode); if (code != 0) { @@ -449,14 +438,14 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode void *pIter = NULL; int32_t numOfReplicas = 0; - SDAlterMnodeReq alterMsg = {0}; + SDAlterMnodeReq alterReq = {0}; while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); if (pIter == NULL) break; if (pMObj->id != pObj->id) { - SReplica *pReplica = &alterMsg.replicas[numOfReplicas]; + SReplica *pReplica = &alterReq.replicas[numOfReplicas]; pReplica->id = htonl(pMObj->id); pReplica->port = htons(pMObj->pDnode->port); memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); @@ -466,7 +455,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode sdbRelease(pSdb, pMObj); } - alterMsg.replica = numOfReplicas; + alterReq.replica = numOfReplicas; while (1) { SMnodeObj *pMObj = NULL; @@ -475,22 +464,23 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode if (pMObj->id != pObj->id) { STransAction action = {0}; - SDAlterMnodeReq *pMsg = malloc(sizeof(SDAlterMnodeReq)); - if (pMsg == NULL) { + SDAlterMnodeReq *pReq = malloc(sizeof(SDAlterMnodeReq)); + if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; } - memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeReq)); + memcpy(pReq, &alterReq, sizeof(SDAlterMnodeReq)); - pMsg->dnodeId = htonl(pMObj->id); + pReq->dnodeId = htonl(pMObj->id); action.epSet = mndGetDnodeEpset(pMObj->pDnode); - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SDAlterMnodeReq); action.msgType = TDMT_DND_ALTER_MNODE; + action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; @@ -504,19 +494,20 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); - SDDropMnodeReq *pMsg = malloc(sizeof(SDDropMnodeReq)); - if (pMsg == NULL) { + SDDropMnodeReq *pReq = malloc(sizeof(SDDropMnodeReq)); + if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pMsg->dnodeId = htonl(pObj->id); + pReq->dnodeId = htonl(pObj->id); action.epSet = mndGetDnodeEpset(pDnode); - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SDDropMnodeReq); action.msgType = TDMT_DND_DROP_MNODE; + action.acceptableCode = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); return -1; } } @@ -524,35 +515,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode return 0; } -static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { +static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("mnode:%d, failed to drop since %s", pObj->id, terrstr()); - goto DROP_MNODE_OVER; - } - mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + if (pTrans == NULL) goto DROP_MNODE_OVER; - if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_MNODE_OVER; - } - - if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_MNODE_OVER; - } - - if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_MNODE_OVER; - } + mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_MNODE_OVER; - } + if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto DROP_MNODE_OVER; + if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto DROP_MNODE_OVER; + if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto DROP_MNODE_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_MNODE_OVER; code = 0; @@ -561,9 +535,9 @@ DROP_MNODE_OVER: return code; } -static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SMDropMnodeReq *pDrop = pMsg->rpcMsg.pCont; +static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMDropMnodeReq *pDrop = pReq->rpcMsg.pCont; pDrop->dnodeId = htonl(pDrop->dnodeId); mDebug("mnode:%d, start to drop", pDrop->dnodeId); @@ -577,12 +551,10 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { SMnodeObj *pObj = mndAcquireMnode(pMnode, pDrop->dnodeId); if (pObj == NULL) { mError("mnode:%d, not exist", pDrop->dnodeId); - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; return -1; } - int32_t code = mndDropMnode(pMnode, pMsg, pObj); - + int32_t code = mndDropMnode(pMnode, pReq, pObj); if (code != 0) { mError("mnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); return -1; @@ -592,23 +564,23 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -660,8 +632,8 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * return 0; } -static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/test/mnode/mnode.cpp b/source/dnode/mnode/impl/test/mnode/mnode.cpp index 2246fdfe2c259e44a2b31adcf131451e45b1815a..f56b864cad7e47f3896f47d11916ff8e4e3332b3 100644 --- a/source/dnode/mnode/impl/test/mnode/mnode.cpp +++ b/source/dnode/mnode/impl/test/mnode/mnode.cpp @@ -18,37 +18,25 @@ class MndTestMnode : public ::testing::Test { public: static void SetUpTestSuite() { - test.Init("/tmp/mnode_test_mnode1", 9031); + test.Init("/tmp/mnode_test_mnode1", 9028); const char* fqdn = "localhost"; - const char* firstEp = "localhost:9031"; + const char* firstEp = "localhost:9028"; - server2.Start("/tmp/mnode_test_mnode2", fqdn, 9032, firstEp); - server3.Start("/tmp/mnode_test_mnode3", fqdn, 9033, firstEp); - server4.Start("/tmp/mnode_test_mnode4", fqdn, 9034, firstEp); - server5.Start("/tmp/mnode_test_mnode5", fqdn, 9035, firstEp); + server2.Start("/tmp/mnode_test_mnode2", fqdn, 9029, firstEp); taosMsleep(300); } static void TearDownTestSuite() { server2.Stop(); - server3.Stop(); - server4.Stop(); - server5.Stop(); test.Cleanup(); } static Testbase test; static TestServer server2; - static TestServer server3; - static TestServer server4; - static TestServer server5; }; Testbase MndTestMnode::test; TestServer MndTestMnode::server2; -TestServer MndTestMnode::server3; -TestServer MndTestMnode::server4; -TestServer MndTestMnode::server5; TEST_F(MndTestMnode, 01_ShowDnode) { test.SendShowMetaReq(TSDB_MGMT_TABLE_MNODE, ""); @@ -64,7 +52,7 @@ TEST_F(MndTestMnode, 01_ShowDnode) { EXPECT_EQ(test.GetShowRows(), 1); CheckInt16(1); - CheckBinary("localhost:9031", TSDB_EP_LEN); + CheckBinary("localhost:9028", TSDB_EP_LEN); CheckBinary("master", 12); CheckInt64(0); CheckTimestamp(); @@ -103,7 +91,7 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); strcpy(pReq->fqdn, "localhost"); - pReq->port = htonl(9032); + pReq->port = htonl(9029); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -132,8 +120,8 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { CheckInt16(1); CheckInt16(2); - CheckBinary("localhost:9031", TSDB_EP_LEN); - CheckBinary("localhost:9032", TSDB_EP_LEN); + CheckBinary("localhost:9028", TSDB_EP_LEN); + CheckBinary("localhost:9029", TSDB_EP_LEN); CheckBinary("master", 12); CheckBinary("slave", 12); CheckInt64(0); @@ -158,144 +146,145 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { EXPECT_EQ(test.GetShowRows(), 1); CheckInt16(1); - CheckBinary("localhost:9031", TSDB_EP_LEN); + CheckBinary("localhost:9028", TSDB_EP_LEN); CheckBinary("master", 12); CheckInt64(0); CheckTimestamp(); } + + { + // drop mnode + int32_t contLen = sizeof(SMDropMnodeReq); + + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MNODE_NOT_EXIST); + } +} + +TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMCreateMnodeReq); + + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, mnode is creating + int32_t contLen = sizeof(SMCreateMnodeReq); + + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_CREATING); + } + + { + // continue send message, mnode is creating + int32_t contLen = sizeof(SMDropMnodeReq); + + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_CREATING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 20; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateMnodeReq); + + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + if (pRsp->code == TSDB_CODE_MND_MNODE_ALREADY_EXIST) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } } -// { -// int32_t contLen = sizeof(SDropDnodeReq); - -// SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen); -// pReq->dnodeId = htonl(2); - -// SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen); -// ASSERT_NE(pRsp, nullptr); -// ASSERT_EQ(pRsp->code, 0); -// } - -// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, ""); -// CHECK_META("show dnodes", 7); -// test.SendShowRetrieveReq(); -// EXPECT_EQ(test.GetShowRows(), 1); - -// CheckInt16(1); -// CheckBinary("localhost:9031", TSDB_EP_LEN); -// CheckInt16(0); -// CheckInt16(1); -// CheckBinary("ready", 10); -// CheckTimestamp(); -// CheckBinary("", 24); - -// { -// int32_t contLen = sizeof(SCreateDnodeReq); - -// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); -// strcpy(pReq->ep, "localhost:9033"); - -// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); -// ASSERT_NE(pRsp, nullptr); -// ASSERT_EQ(pRsp->code, 0); -// } - -// { -// int32_t contLen = sizeof(SCreateDnodeReq); - -// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); -// strcpy(pReq->ep, "localhost:9034"); - -// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); -// ASSERT_NE(pRsp, nullptr); -// ASSERT_EQ(pRsp->code, 0); -// } - -// { -// int32_t contLen = sizeof(SCreateDnodeReq); - -// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); -// strcpy(pReq->ep, "localhost:9035"); - -// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); -// ASSERT_NE(pRsp, nullptr); -// ASSERT_EQ(pRsp->code, 0); -// } - -// taosMsleep(1300); -// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, ""); -// CHECK_META("show dnodes", 7); -// test.SendShowRetrieveReq(); -// EXPECT_EQ(test.GetShowRows(), 4); - -// CheckInt16(1); -// CheckInt16(3); -// CheckInt16(4); -// CheckInt16(5); -// CheckBinary("localhost:9031", TSDB_EP_LEN); -// CheckBinary("localhost:9033", TSDB_EP_LEN); -// CheckBinary("localhost:9034", TSDB_EP_LEN); -// CheckBinary("localhost:9035", TSDB_EP_LEN); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(1); -// CheckInt16(1); -// CheckInt16(1); -// CheckInt16(1); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckBinary("", 24); -// CheckBinary("", 24); -// CheckBinary("", 24); -// CheckBinary("", 24); - -// // restart -// uInfo("stop all server"); -// test.Restart(); -// server2.Restart(); -// server3.Restart(); -// server4.Restart(); -// server5.Restart(); - -// taosMsleep(1300); -// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, ""); -// CHECK_META("show dnodes", 7); -// test.SendShowRetrieveReq(); -// EXPECT_EQ(test.GetShowRows(), 4); - -// CheckInt16(1); -// CheckInt16(3); -// CheckInt16(4); -// CheckInt16(5); -// CheckBinary("localhost:9031", TSDB_EP_LEN); -// CheckBinary("localhost:9033", TSDB_EP_LEN); -// CheckBinary("localhost:9034", TSDB_EP_LEN); -// CheckBinary("localhost:9035", TSDB_EP_LEN); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(0); -// CheckInt16(1); -// CheckInt16(1); -// CheckInt16(1); -// CheckInt16(1); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckBinary("ready", 10); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckTimestamp(); -// CheckBinary("", 24); -// CheckBinary("", 24); -// CheckBinary("", 24); -// CheckBinary("", 24); -// } + +TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMDropMnodeReq); + + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, mnode is dropping + int32_t contLen = sizeof(SMCreateMnodeReq); + + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // continue send message, mnode is dropping + int32_t contLen = sizeof(SMDropMnodeReq); + + SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 20; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateMnodeReq); + + SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + if (pRsp->code == 0) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } +} \ No newline at end of file diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index 8abb9830590091341eff70b3170fbbff14ce572b..10ec335fc8fa4c45ade72e66c33ae23e319008a5 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -98,7 +98,7 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf); * @param msgBufLen * @return */ -int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen); +int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseBasicCtx *pCtx, char* msg, int32_t msgBufLen); /** * Destroy the meta data request structure. diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 50054690b74eacedde863f7514328f1e4e5027a6..3ca3d87a79b601899114dd71b28274a5345a9435 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -213,7 +213,7 @@ SQueryStmtInfo *createQueryInfo() { pQueryInfo->slimit.limit = -1; pQueryInfo->slimit.offset = 0; - pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->window = TSWINDOW_INITIALIZER; pQueryInfo->exprList = calloc(10, POINTER_BYTES); @@ -247,8 +247,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) { tfree(pQueryInfo->fillVal); tfree(pQueryInfo->buf); - taosArrayDestroy(pQueryInfo->pUpstream); - pQueryInfo->pUpstream = NULL; + taosArrayDestroy(pQueryInfo->pDownstream); + pQueryInfo->pDownstream = NULL; pQueryInfo->bufLen = 0; } @@ -256,9 +256,9 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) { while (pQueryInfo != NULL) { SQueryStmtInfo* p = pQueryInfo->sibling; - size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pUpstream); + size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pDownstream); for (int32_t i = 0; i < numOfUpstream; ++i) { - SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i); + SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pDownstream, i); destroyQueryInfoImpl(pUpQueryInfo); clearAllTableMetaInfo(pUpQueryInfo, false, 0); tfree(pUpQueryInfo); @@ -288,7 +288,6 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI } pSub->pUdfInfo = pUdfInfo; - pSub->pDownstream = pQueryInfo; int32_t code = validateSqlNode(p, pSub, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { return code; @@ -311,7 +310,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI tstrncpy(pTableMetaInfo1->aliasName, subInfo->aliasName.z, subInfo->aliasName.n + 1); } - taosArrayPush(pQueryInfo->pUpstream, &pSub); + taosArrayPush(pQueryInfo->pDownstream, &pSub); // NOTE: order mix up in subquery not support yet. pQueryInfo->order = pSub->order; @@ -600,7 +599,7 @@ int32_t checkForUnsupportedQuery(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) { return buildInvalidOperationMsg(pMsgBuf, msg1); } - if (f == FUNCTION_BLKINFO && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { + if (f == FUNCTION_BLKINFO && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { return buildInvalidOperationMsg(pMsgBuf, msg1); } @@ -1584,7 +1583,6 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* } pushDownAggFuncExprInfo(pQueryInfo); -// addColumnNodeFromLowerLevel(pQueryInfo); for(int32_t i = 0; i < 1; ++i) { SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]); @@ -3904,17 +3902,30 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt // TODO: check if the qnode info has been cached already req.qNodeRequired = true; - code = qParserExtractRequestedMetaInfo(pInfo, &req, msgBuf, msgBufLen); + code = qParserExtractRequestedMetaInfo(pInfo, &req, pCtx, msgBuf, msgBufLen); if (code != TSDB_CODE_SUCCESS) { return code; } // load the meta data from catalog - code = catalogGetAllMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &req, &data); +// code = catalogGetAllMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &req, &data); + STableMeta* pmt = NULL; + + SName* name = taosArrayGet(req.pTableName, 0); + code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &pmt); if (code != TSDB_CODE_SUCCESS) { return code; } + data.pTableMeta = taosArrayInit(1, POINTER_BYTES); + taosArrayPush(data.pTableMeta, &pmt); + + pQueryInfo->pTableMetaInfo = calloc(1, POINTER_BYTES); + pQueryInfo->pTableMetaInfo[0] = calloc(1, sizeof(STableMetaInfo)); + pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt; + pQueryInfo->pTableMetaInfo[0]->name = *name; + pQueryInfo->numOfTables = 1; + // evaluate the sqlnode STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0); assert(pTableMeta != NULL); diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 2ff4e35ea3db4b01e56f3ebc9d78d5fcf2f6eebf..06729813c1516b526fd1fc87bcc0df0fd9506df5 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -194,6 +194,18 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); } + val = htonl(pCreate->maxRows); + if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) { + snprintf(msg, tListLen(msg), "invalid number of max rows in file block for DB:%d valid range: [%d, %d]", val, + TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK); + } + + val = htonl(pCreate->minRows); + if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) { + snprintf(msg, tListLen(msg), "invalid number of min rows in file block for DB:%d valid range: [%d, %d]", val, + TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 225d8bb9978eb071516f7a3920888ae65d1e6930..d22f92517d8fac2b6feba91bb65bfe0f6e07ce13 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -61,7 +61,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { pDcl->nodeType = info.type; } } else { - SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); + SQueryStmtInfo* pQueryInfo = createQueryInfo(); if (pQueryInfo == NULL) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code. return terrno; @@ -89,7 +89,7 @@ int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) { return 0; } -static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf); +static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); static int32_t tnameComparFn(const void* p1, const void* p2) { SName* pn1 = (SName*)p1; @@ -113,7 +113,7 @@ static int32_t tnameComparFn(const void* p1, const void* p2) { } } -static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf) { +static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) { int32_t numOfSub = (int32_t)taosArrayGetSize(pSqlNode->from->list); for (int32_t j = 0; j < numOfSub; ++j) { @@ -123,12 +123,12 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis for (int32_t i = 0; i < num; ++i) { SSqlNode* p = taosArrayGetP(sub->pSubquery->node, i); if (p->from->type == SQL_FROM_NODE_TABLES) { - int32_t code = getTableNameFromSqlNode(p, tableNameList, pMsgBuf); + int32_t code = getTableNameFromSqlNode(p, tableNameList, pCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } } else { - getTableNameFromSubquery(p, tableNameList, pMsgBuf); + getTableNameFromSubquery(p, tableNameList, pCtx, pMsgBuf); } } } @@ -136,7 +136,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis return TSDB_CODE_SUCCESS; } -int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf) { +int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pParseCtx, SMsgBuf* pMsgBuf) { const char* msg1 = "invalid table name"; int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list); @@ -155,7 +155,11 @@ int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgB } SName name = {0}; - strndequote(name.tname, t->z, t->n); + int32_t code = createSName(&name, t, pParseCtx, pMsgBuf); + if (code != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + taosArrayPush(tableNameList, &name); } @@ -166,7 +170,7 @@ static void freePtrElem(void* p) { tfree(*(char**)p); } -int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen) { +int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseBasicCtx *pCtx, char* msg, int32_t msgBufLen) { int32_t code = TSDB_CODE_SUCCESS; SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen}; @@ -182,12 +186,12 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* p // load the table meta in the FROM clause if (pSqlNode->from->type == SQL_FROM_NODE_TABLES) { - code = getTableNameFromSqlNode(pSqlNode, pMetaInfo->pTableName, &msgBuf); + code = getTableNameFromSqlNode(pSqlNode, pMetaInfo->pTableName, pCtx, &msgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } } else { - code = getTableNameFromSubquery(pSqlNode, pMetaInfo->pTableName, &msgBuf); + code = getTableNameFromSubquery(pSqlNode, pMetaInfo->pTableName, pCtx, &msgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/parser/test/parserTests.cpp b/source/libs/parser/test/parserTests.cpp index fe430c5f5e53b63ce8be36d896603200f894baf8..8758fdbc718c82286a6162c57bf28aa304db26c8 100644 --- a/source/libs/parser/test/parserTests.cpp +++ b/source/libs/parser/test/parserTests.cpp @@ -77,12 +77,15 @@ void sqlCheck(const char* sql, bool valid) { buf.len = 128; buf.buf = msg; + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; SSqlNode* pNode = (SSqlNode*)taosArrayGetP(((SArray*)info1.sub.node), 0); int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -119,7 +122,11 @@ TEST(testCase, validateAST_test) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -177,7 +184,11 @@ TEST(testCase, function_Test) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -223,7 +234,11 @@ TEST(testCase, function_Test2) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -269,7 +284,11 @@ TEST(testCase, function_Test3) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -314,7 +333,11 @@ TEST(testCase, function_Test4) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -362,7 +385,11 @@ TEST(testCase, function_Test5) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -447,7 +474,11 @@ TEST(testCase, function_Test6) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -525,7 +556,11 @@ TEST(testCase, function_Test6) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -587,7 +622,11 @@ TEST(testCase, function_Test6) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); + ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -636,7 +675,7 @@ TEST(testCase, function_Test6) { code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -666,7 +705,10 @@ TEST(testCase, function_Test6) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -688,7 +730,7 @@ TEST(testCase, function_Test6) { code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); ASSERT_EQ(code, 0); - ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp index 04c3a7d81ad8c14ae1cb0f4bc689684a39b18fae..8d9fbadfad4d454cb88bc16adaa50be5fc7f594c 100644 --- a/source/libs/parser/test/plannerTest.cpp +++ b/source/libs/parser/test/plannerTest.cpp @@ -81,7 +81,8 @@ void generateLogicplan(const char* sql) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); @@ -121,7 +122,9 @@ TEST(testCase, planner_test) { ASSERT_EQ(code, 0); SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + SParseBasicCtx ctx = {0}; + + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); diff --git a/source/libs/parser/test/tokenizerTest.cpp b/source/libs/parser/test/tokenizerTest.cpp index 3ab6a6531c0128bae8ccd64f5a383c7797243a0c..ee01a50148db1d7d2597bea07241dc2085e3fac6 100644 --- a/source/libs/parser/test/tokenizerTest.cpp +++ b/source/libs/parser/test/tokenizerTest.cpp @@ -710,7 +710,11 @@ TEST(testCase, extractMeta_test) { char msg[128] = {0}; SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + + SParseBasicCtx ctx = {0}; + ctx.db = "db1"; + ctx.acctId = 1; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128); ASSERT_EQ(ret, 0); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 31e057f4c0d936b4b1f2c653bc6b1f5f1d4d223a..63d2e9b85529d92293d85438379831c33492f271 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -53,8 +53,8 @@ typedef struct SQueryDistPlanNodeInfo { typedef struct SQueryTableInfo { char *tableName; // to be deleted uint64_t uid; // to be deleted - STableMetaInfo* pMeta; - STimeWindow window; + STableMetaInfo *pMeta; + STimeWindow window; } SQueryTableInfo; typedef struct SQueryPlanNode { diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index fa7b3776dc894f741dfccaf42f942043a5467672..9a9b40473bf02073e8a4a702137dc076caabad26 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -64,10 +64,11 @@ static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode* } int32_t createSelectPlan(const SQueryStmtInfo* pSelect, SQueryPlanNode** pQueryPlan) { - SArray* upstream = createQueryPlanImpl(pSelect); - assert(taosArrayGetSize(upstream) == 1); - *pQueryPlan = taosArrayGetP(upstream, 0); - taosArrayDestroy(upstream); + SArray* pDownstream = createQueryPlanImpl(pSelect); + assert(taosArrayGetSize(pDownstream) == 1); + + *pQueryPlan = taosArrayGetP(pDownstream, 0); + taosArrayDestroy(pDownstream); return TSDB_CODE_SUCCESS; } @@ -100,23 +101,21 @@ void destroyQueryPlan(SQueryPlanNode* pQueryNode) { //====================================================================================================================== -static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, +static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** pChildrenNode, int32_t numOfChildren, SExprInfo** pExpr, int32_t numOfOutput, const void* pExtInfo) { SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode)); pNode->info.type = type; pNode->info.name = strdup(name); - pNode->numOfExpr = numOfOutput; - pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES); - for(int32_t i = 0; i < numOfOutput; ++i) { - taosArrayPush(pNode->pExpr, &pExpr[i]); - } + pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES); + taosArrayAddBatch(pNode->pExpr, pExpr, numOfOutput); + assert(pNode->numOfExpr == numOfOutput); pNode->pChildren = taosArrayInit(4, POINTER_BYTES); - for(int32_t i = 0; i < numOfPrev; ++i) { - taosArrayPush(pNode->pChildren, &prev[i]); + for(int32_t i = 0; i < numOfChildren; ++i) { + taosArrayPush(pNode->pChildren, &pChildrenNode[i]); } switch(type) { @@ -184,8 +183,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla return pNode; } -static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info, - SArray* pExprs, SArray* tableCols) { +static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQueryTableInfo* info, SArray* pExprs, SArray* tableCols) { if (pQueryInfo->info.onlyTagQuery) { int32_t num = (int32_t) taosArrayGetSize(pExprs); SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info); @@ -193,16 +191,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, ST if (pQueryInfo->info.distinct) { pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); } - return pNode; } SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); - if (pQueryInfo->info.projectionQuery) { - int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs); - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL); - } else { + if (!pQueryInfo->info.projectionQuery) { STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); // table source column projection, generate the projection expr @@ -262,7 +256,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL); } } else { - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); + // here we can push down the projection to tablescan operator. + pNode->numOfExpr = num; + pNode->pExpr = taosArrayInit(num, POINTER_BYTES); + taosArrayAddAll(pNode->pExpr, p); +// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); } } @@ -299,9 +297,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTable(const SQueryStmtInfo* pQu tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN); SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,}; + info.window = pQueryInfo->window; + info.pMeta = pTableMetaInfo; // handle the only tag query - SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols); + SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, pExprs, tableCols); if (pQueryInfo->info.onlyTagQuery) { tfree(info.tableName); return pNode; @@ -326,23 +326,23 @@ static bool isAllAggExpr(SArray* pList) { } SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { - SArray* upstream = NULL; + SArray* pDownstream = NULL; - if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause - upstream = taosArrayInit(4, POINTER_BYTES); + if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { // subquery in the from clause + pDownstream = taosArrayInit(4, POINTER_BYTES); - size_t size = taosArrayGetSize(pQueryInfo->pUpstream); + size_t size = taosArrayGetSize(pQueryInfo->pDownstream); for(int32_t i = 0; i < size; ++i) { - SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i); + SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pDownstream, i); SArray* p = createQueryPlanImpl(pq); - taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); + taosArrayAddBatch(pDownstream, p->pData, (int32_t) taosArrayGetSize(p)); } } if (pQueryInfo->numOfTables > 1) { // it is a join query // 1. separate the select clause according to table - taosArrayDestroy(upstream); - upstream = taosArrayInit(5, POINTER_BYTES); + taosArrayDestroy(pDownstream); + pDownstream = taosArrayInit(5, POINTER_BYTES); for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i]; @@ -365,30 +365,30 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { columnListCopy(tableColumnList, pQueryInfo->colList, uid); // 4. add the projection query node - SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList); + SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, exprList, tableColumnList); columnListDestroy(tableColumnList); // dropAllExprInfo(exprList); - taosArrayPush(upstream, &pNode); + taosArrayPush(pDownstream, &pNode); } // 3. add the join node here SQueryTableInfo info = {0}; int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]); - SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables, + SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", pDownstream->pData, pQueryInfo->numOfTables, pQueryInfo->exprList[0]->pData, num, NULL); // 4. add the aggregation or projection execution node pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info); - upstream = taosArrayInit(5, POINTER_BYTES); - taosArrayPush(upstream, &pNode); + pDownstream = taosArrayInit(5, POINTER_BYTES); + taosArrayPush(pDownstream, &pNode); } else { // only one table, normal query process STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList); - upstream = taosArrayInit(5, POINTER_BYTES); - taosArrayPush(upstream, &pNode); + pDownstream = taosArrayInit(5, POINTER_BYTES); + taosArrayPush(pDownstream, &pNode); } - return upstream; + return pDownstream; } static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { @@ -434,22 +434,23 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, switch(pQueryNode->info.type) { case QNODE_TABLESCAN: { SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; - len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid, - pInfo->window.skey, pInfo->window.ekey); + len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid); assert(len1 > 0); len += len1; - for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { - SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i); - len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId); - - assert(len1 > 0); - len += len1; - } + len1 = sprintf(buf + len, " , cols:"); + assert(len1 > 0); + len += len1; - len1 = sprintf(buf + len, "\n"); + len = printExprInfo(buf, pQueryNode, len); + len1 = sprintf(buf + len, ")"); assert(len1 > 0); + // todo print filter info + len1 = sprintf(buf + len, ") filters:(nil)"); + len += len1; + + len1 = sprintf(buf + len, " time_range: %" PRId64 " - %" PRId64"\n", pInfo->window.skey, pInfo->window.ekey); len += len1; break; } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index e7468e44eb723c7cefbbc5f34979719c243bb770..461f16cdf0776f2cf40b8eaf6b32f133412c493c 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -187,7 +187,8 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan))); subplan->id = pCxt->nextId; ++(pCxt->nextId.subplanId); - subplan->type = type; + + subplan->type = type; subplan->level = 0; if (NULL != pCxt->pCurrentSubplan) { subplan->level = pCxt->pCurrentSubplan->level + 1; @@ -275,6 +276,8 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; + case QNODE_PROJECT: +// node = create case QNODE_MODIFY: // Insert is not an operator in a physical plan. break; @@ -335,7 +338,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD .pCatalog = pCatalog, .pDag = validPointer(calloc(1, sizeof(SQueryDag))), .pCurrentSubplan = NULL, - .nextId = {0} // todo queryid + .nextId = {.queryId = requestId}, }; *pDag = context.pDag; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index b25d9a3627d08222bfc8e8f8e7c39cae0a342646..faeed74f1a65fa05ada2c7506983b5d4f03bdf84 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -230,9 +230,11 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) { if (res) { res = cJSON_AddNumberToObject(jCol, jkColumnInfoBytes, col->bytes); } - if (res) { - res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters); + + if (res) { // TODO: temporarily disable it +// res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters); } + return res; } @@ -794,7 +796,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) { } // The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized. - bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id); if (res) { res = addObject(jSubplan, jkSubplanNode, phyNodeToJson, subplan->pNode); @@ -807,6 +808,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) { cJSON_Delete(jSubplan); return NULL; } + return jSubplan; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index b61c7c390f5363c75a08b051752bcb440de6b468..54bddd0e3f68a0d3c964d3d1e93a93d98cca6952 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -64,6 +64,13 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, return code; } + // + if (logicPlan->info.type != QNODE_MODIFY) { +// char* str = NULL; +// queryPlanToString(logicPlan, &str); +// printf("%s\n", str); + } + code = optimizeQueryPlan(logicPlan); if (TSDB_CODE_SUCCESS != code) { destroyQueryPlan(logicPlan); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index df4f121773f814dfdaff4725026372414485feae..a07620f102d0ca312a4ddfe12904dc71c7b7c4cb 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -88,7 +88,6 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - return TSDB_CODE_SUCCESS; } @@ -248,19 +247,20 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { } int32_t addNum = 0; - int32_t nodeNum = taosArrayGetSize(job->nodeList); - - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { - SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); - - if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { - qError("taosArrayPush failed"); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + + if (job->nodeList) { + int32_t nodeNum = (int32_t) taosArrayGetSize(job->nodeList); + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); + + if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) { + qError("taosArrayPush failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ++addNum; } - - ++addNum; } - /* for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); @@ -279,8 +279,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), - pJob->queryId); + qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), pJob->queryId); return TSDB_CODE_SUCCESS; } @@ -997,7 +996,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void } int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) { - if (NULL == transport || NULL == nodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { + if (NULL == transport || /*NULL == nodeList || */NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); }