提交 80394112 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode

...@@ -135,9 +135,8 @@ typedef struct SQueryStmtInfo { ...@@ -135,9 +135,8 @@ typedef struct SQueryStmtInfo {
SArray *pUdfInfo; SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling struct SQueryStmtInfo *sibling; // sibling
struct SQueryStmtInfo *pDownstream;
SMultiFunctionsDesc info; SMultiFunctionsDesc info;
SArray *pUpstream; // SArray<struct SQueryStmtInfo> SArray *pDownstream; // SArray<struct SQueryStmtInfo>
int32_t havingFieldNum; int32_t havingFieldNum;
int32_t exprListLevelIndex; int32_t exprListLevelIndex;
} SQueryStmtInfo; } SQueryStmtInfo;
......
...@@ -48,7 +48,7 @@ int main(int argc, char** argv) { ...@@ -48,7 +48,7 @@ int main(int argc, char** argv) {
} }
TEST(testCase, driverInit_Test) { taos_init(); } TEST(testCase, driverInit_Test) { taos_init(); }
#if 0
TEST(testCase, connect_Test) { TEST(testCase, connect_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
...@@ -551,7 +551,7 @@ TEST(testCase, generated_request_id_test) { ...@@ -551,7 +551,7 @@ TEST(testCase, generated_request_id_test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
#endif //#endif
TEST(testCase, projection_query_tables) { TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......
...@@ -502,7 +502,7 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -502,7 +502,7 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) { if (pMnode == NULL) {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
dError("failed to drop mnode since %s", terrstr()); dError("failed to drop mnode since %s", terrstr());
return -1; return -1;
......
...@@ -13,20 +13,10 @@ ...@@ -13,20 +13,10 @@
class DndTestMnode : public ::testing::Test { class DndTestMnode : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9113); }
test.Init("/tmp/dnode_test_mnode", 9113); static void TearDownTestSuite() { test.Cleanup(); }
const char* fqdn = "localhost";
const char* firstEp = "localhost:9113";
server2.Start("/tmp/dnode_test_mnode2", fqdn, 9114, firstEp);
}
static void TearDownTestSuite() {
server2.Stop();
test.Cleanup();
}
static Testbase test; static Testbase test;
static TestServer server2;
public: public:
void SetUp() override {} void SetUp() override {}
...@@ -34,24 +24,6 @@ class DndTestMnode : public ::testing::Test { ...@@ -34,24 +24,6 @@ class DndTestMnode : public ::testing::Test {
}; };
Testbase DndTestMnode::test; Testbase DndTestMnode::test;
TestServer DndTestMnode::server2;
TEST_F(DndTestMnode, 01_Create_Dnode) {
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9114);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
taosMsleep(1300);
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 2);
}
TEST_F(DndTestMnode, 01_Create_Mnode) { TEST_F(DndTestMnode, 01_Create_Mnode) {
{ {
...@@ -101,100 +73,55 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { ...@@ -101,100 +73,55 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
} }
// {
// int32_t contLen = sizeof(SDCreateMnodeReq);
// SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(1);
// pReq->replica = 2;
// pReq->replicas[0].id = htonl(1);
// pReq->replicas[0].port = htonl(9113);
// pReq->replicas[0].id = htonl(1);
// pReq->replicas[0].port = htonl(9113);
// strcpy(pReq->replicas[0].fqdn, "localhost");
// SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr);
// ASSERT_EQ(pRsp->code, 0);
// }
// {
// int32_t contLen = sizeof(SDCreateMnodeReq);
// SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(1);
// SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr);
// ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
// }
// test.Restart();
// {
// int32_t contLen = sizeof(SDCreateMnodeReq);
// SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(1);
// SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr);
// ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
// }
} }
#if 0
TEST_F(DndTestMnode, 02_Alter_Mnode) { TEST_F(DndTestMnode, 02_Alter_Mnode) {
{ {
int32_t contLen = sizeof(SDCreateMnodeReq); int32_t contLen = sizeof(SDAlterMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2); pReq->dnodeId = htonl(2);
pReq->replica = 1;
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION);
} }
{ {
int32_t contLen = sizeof(SDCreateMnodeReq); int32_t contLen = sizeof(SDAlterMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1); pReq->dnodeId = htonl(1);
pReq->replica = 1;
pReq->replicas[0].id = htonl(2);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION);
} }
test.Restart();
{ {
int32_t contLen = sizeof(SDCreateMnodeReq); int32_t contLen = sizeof(SDAlterMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1); pReq->dnodeId = htonl(1);
pReq->replica = 1;
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); ASSERT_EQ(pRsp->code, 0);
} }
} }
TEST_F(DndTestMnode, 03_Drop_Mnode) { TEST_F(DndTestMnode, 03_Drop_Mnode) {
{ {
int32_t contLen = sizeof(SDDropMnodeReq); int32_t contLen = sizeof(SDDropMnodeReq);
...@@ -204,7 +131,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ...@@ -204,7 +131,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION);
} }
{ {
...@@ -229,28 +156,34 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ...@@ -229,28 +156,34 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
} }
test.Restart();
{ {
int32_t contLen = sizeof(SDDropMnodeReq); int32_t contLen = sizeof(SDAlterMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1); pReq->dnodeId = htonl(1);
pReq->replica = 1;
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
} }
{ {
int32_t contLen = sizeof(SDCreateMnodeReq); int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1); pReq->dnodeId = htonl(1);
pReq->replica = 2;
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
} }
#endif \ No newline at end of file
\ No newline at end of file
...@@ -27,14 +27,14 @@ static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj); ...@@ -27,14 +27,14 @@ static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw); static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj);
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj);
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode); static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew);
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
int32_t mndInitMnode(SMnode *pMnode) { int32_t mndInitMnode(SMnode *pMnode) {
...@@ -65,7 +65,7 @@ void mndCleanupMnode(SMnode *pMnode) {} ...@@ -65,7 +65,7 @@ void mndCleanupMnode(SMnode *pMnode) {}
static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) { static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &mnodeId); SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &mnodeId);
if (pObj == NULL) { if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_MNODE_NOT_EXIST; terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
} }
return pObj; return pObj;
...@@ -207,9 +207,9 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { ...@@ -207,9 +207,9 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
return 0; return 0;
} }
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode) { static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOldMnode->id, pOldMnode, pNewMnode); mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
pOldMnode->updateTime = pNewMnode->updateTime; pOld->updateTime = pNew->updateTime;
return 0; return 0;
} }
...@@ -277,13 +277,13 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -277,13 +277,13 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
void *pIter = NULL; void *pIter = NULL;
int32_t numOfReplicas = 0; int32_t numOfReplicas = 0;
SDCreateMnodeReq createMsg = {0}; SDCreateMnodeReq createReq = {0};
while (1) { while (1) {
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break; if (pIter == NULL) break;
SReplica *pReplica = &createMsg.replicas[numOfReplicas]; SReplica *pReplica = &createReq.replicas[numOfReplicas];
pReplica->id = htonl(pMObj->id); pReplica->id = htonl(pMObj->id);
pReplica->port = htons(pMObj->pDnode->port); pReplica->port = htons(pMObj->pDnode->port);
memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
...@@ -292,13 +292,13 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -292,13 +292,13 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
} }
SReplica *pReplica = &createMsg.replicas[numOfReplicas]; SReplica *pReplica = &createReq.replicas[numOfReplicas];
pReplica->id = htonl(pDnode->id); pReplica->id = htonl(pDnode->id);
pReplica->port = htons(pDnode->port); pReplica->port = htons(pDnode->port);
memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++; numOfReplicas++;
createMsg.replica = numOfReplicas; createReq.replica = numOfReplicas;
while (1) { while (1) {
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
...@@ -307,22 +307,23 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -307,22 +307,23 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
STransAction action = {0}; STransAction action = {0};
SDAlterMnodeReq *pMsg = malloc(sizeof(SDAlterMnodeReq)); SDAlterMnodeReq *pReq = malloc(sizeof(SDAlterMnodeReq));
if (pMsg == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
return -1; return -1;
} }
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeReq)); memcpy(pReq, &createReq, sizeof(SDAlterMnodeReq));
pMsg->dnodeId = htonl(pMObj->id); pReq->dnodeId = htonl(pMObj->id);
action.epSet = mndGetDnodeEpset(pMObj->pDnode); action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pMsg; action.pCont = pReq;
action.contLen = sizeof(SDAlterMnodeReq); action.contLen = sizeof(SDAlterMnodeReq);
action.msgType = TDMT_DND_ALTER_MNODE; action.msgType = TDMT_DND_ALTER_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pReq);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
return -1; return -1;
...@@ -335,17 +336,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -335,17 +336,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
SDCreateMnodeReq *pMsg = malloc(sizeof(SDCreateMnodeReq)); SDCreateMnodeReq *pReq = malloc(sizeof(SDCreateMnodeReq));
if (pMsg == NULL) return -1; if (pReq == NULL) return -1;
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeReq)); memcpy(pReq, &createReq, sizeof(SDAlterMnodeReq));
pMsg->dnodeId = htonl(pObj->id); pReq->dnodeId = htonl(pObj->id);
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg; action.pCont = pReq;
action.contLen = sizeof(SDCreateMnodeReq); action.contLen = sizeof(SDCreateMnodeReq);
action.msgType = TDMT_DND_CREATE_MNODE; action.msgType = TDMT_DND_CREATE_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pReq);
return -1; return -1;
} }
} }
...@@ -353,39 +355,23 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -353,39 +355,23 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
return 0; return 0;
} }
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
int32_t code = -1;
SMnodeObj mnodeObj = {0}; SMnodeObj mnodeObj = {0};
mnodeObj.id = pDnode->id; mnodeObj.id = pDnode->id;
mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime; mnodeObj.updateTime = mnodeObj.createdTime;
int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) goto CREATE_MNODE_OVER;
if (pTrans == NULL) {
mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
goto CREATE_MNODE_OVER;
}
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) { mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto CREATE_MNODE_OVER;
goto CREATE_MNODE_OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto CREATE_MNODE_OVER;
} if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto CREATE_MNODE_OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto CREATE_MNODE_OVER;
}
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_MNODE_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_MNODE_OVER;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto CREATE_MNODE_OVER;
}
code = 0; code = 0;
...@@ -394,9 +380,9 @@ CREATE_MNODE_OVER: ...@@ -394,9 +380,9 @@ CREATE_MNODE_OVER:
return code; return code;
} }
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SMCreateMnodeReq *pCreate = pMsg->rpcMsg.pCont; SMCreateMnodeReq *pCreate = pReq->rpcMsg.pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dnodeId = htonl(pCreate->dnodeId);
...@@ -408,6 +394,9 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { ...@@ -408,6 +394,9 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
mError("mnode:%d, mnode already exist", pObj->id); mError("mnode:%d, mnode already exist", pObj->id);
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
return -1; return -1;
} else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
mError("qnode:%d, failed to create mnode since %s", pCreate->dnodeId, terrstr());
return -1;
} }
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
...@@ -417,7 +406,7 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { ...@@ -417,7 +406,7 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
return -1; return -1;
} }
int32_t code = mndCreateMnode(pMnode, pMsg, pDnode, pCreate); int32_t code = mndCreateMnode(pMnode, pReq, pDnode, pCreate);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
if (code != 0) { if (code != 0) {
...@@ -449,14 +438,14 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -449,14 +438,14 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
void *pIter = NULL; void *pIter = NULL;
int32_t numOfReplicas = 0; int32_t numOfReplicas = 0;
SDAlterMnodeReq alterMsg = {0}; SDAlterMnodeReq alterReq = {0};
while (1) { while (1) {
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pMObj->id != pObj->id) { if (pMObj->id != pObj->id) {
SReplica *pReplica = &alterMsg.replicas[numOfReplicas]; SReplica *pReplica = &alterReq.replicas[numOfReplicas];
pReplica->id = htonl(pMObj->id); pReplica->id = htonl(pMObj->id);
pReplica->port = htons(pMObj->pDnode->port); pReplica->port = htons(pMObj->pDnode->port);
memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
...@@ -466,7 +455,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -466,7 +455,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
} }
alterMsg.replica = numOfReplicas; alterReq.replica = numOfReplicas;
while (1) { while (1) {
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
...@@ -475,22 +464,23 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -475,22 +464,23 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
if (pMObj->id != pObj->id) { if (pMObj->id != pObj->id) {
STransAction action = {0}; STransAction action = {0};
SDAlterMnodeReq *pMsg = malloc(sizeof(SDAlterMnodeReq)); SDAlterMnodeReq *pReq = malloc(sizeof(SDAlterMnodeReq));
if (pMsg == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
return -1; return -1;
} }
memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeReq)); memcpy(pReq, &alterReq, sizeof(SDAlterMnodeReq));
pMsg->dnodeId = htonl(pMObj->id); pReq->dnodeId = htonl(pMObj->id);
action.epSet = mndGetDnodeEpset(pMObj->pDnode); action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pMsg; action.pCont = pReq;
action.contLen = sizeof(SDAlterMnodeReq); action.contLen = sizeof(SDAlterMnodeReq);
action.msgType = TDMT_DND_ALTER_MNODE; action.msgType = TDMT_DND_ALTER_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pReq);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
return -1; return -1;
...@@ -504,19 +494,20 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -504,19 +494,20 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
SDDropMnodeReq *pMsg = malloc(sizeof(SDDropMnodeReq)); SDDropMnodeReq *pReq = malloc(sizeof(SDDropMnodeReq));
if (pMsg == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pMsg->dnodeId = htonl(pObj->id); pReq->dnodeId = htonl(pObj->id);
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg; action.pCont = pReq;
action.contLen = sizeof(SDDropMnodeReq); action.contLen = sizeof(SDDropMnodeReq);
action.msgType = TDMT_DND_DROP_MNODE; action.msgType = TDMT_DND_DROP_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pReq);
return -1; return -1;
} }
} }
...@@ -524,35 +515,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -524,35 +515,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
return 0; return 0;
} }
static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
mError("mnode:%d, failed to drop since %s", pObj->id, terrstr());
goto DROP_MNODE_OVER;
}
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_MNODE_OVER;
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) { mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto DROP_MNODE_OVER;
}
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_MNODE_OVER;
}
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_MNODE_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto DROP_MNODE_OVER;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto DROP_MNODE_OVER;
goto DROP_MNODE_OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto DROP_MNODE_OVER;
} if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_MNODE_OVER;
code = 0; code = 0;
...@@ -561,9 +535,9 @@ DROP_MNODE_OVER: ...@@ -561,9 +535,9 @@ DROP_MNODE_OVER:
return code; return code;
} }
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SMDropMnodeReq *pDrop = pMsg->rpcMsg.pCont; SMDropMnodeReq *pDrop = pReq->rpcMsg.pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId); pDrop->dnodeId = htonl(pDrop->dnodeId);
mDebug("mnode:%d, start to drop", pDrop->dnodeId); mDebug("mnode:%d, start to drop", pDrop->dnodeId);
...@@ -577,12 +551,10 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { ...@@ -577,12 +551,10 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDrop->dnodeId); SMnodeObj *pObj = mndAcquireMnode(pMnode, pDrop->dnodeId);
if (pObj == NULL) { if (pObj == NULL) {
mError("mnode:%d, not exist", pDrop->dnodeId); mError("mnode:%d, not exist", pDrop->dnodeId);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1; return -1;
} }
int32_t code = mndDropMnode(pMnode, pMsg, pObj); int32_t code = mndDropMnode(pMnode, pReq, pObj);
if (code != 0) { if (code != 0) {
mError("mnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); mError("mnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
return -1; return -1;
...@@ -592,23 +564,23 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { ...@@ -592,23 +564,23 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pMsg); mndTransProcessRsp(pRsp);
return 0; return 0;
} }
static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pMsg); mndTransProcessRsp(pRsp);
return 0; return 0;
} }
static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pMsg); mndTransProcessRsp(pRsp);
return 0; return 0;
} }
static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0; int32_t cols = 0;
...@@ -660,8 +632,8 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * ...@@ -660,8 +632,8 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
return 0; return 0;
} }
static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t cols = 0; int32_t cols = 0;
......
...@@ -18,37 +18,25 @@ class MndTestMnode : public ::testing::Test { ...@@ -18,37 +18,25 @@ class MndTestMnode : public ::testing::Test {
public: public:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/mnode_test_mnode1", 9031); test.Init("/tmp/mnode_test_mnode1", 9028);
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9031"; const char* firstEp = "localhost:9028";
server2.Start("/tmp/mnode_test_mnode2", fqdn, 9032, firstEp); server2.Start("/tmp/mnode_test_mnode2", fqdn, 9029, firstEp);
server3.Start("/tmp/mnode_test_mnode3", fqdn, 9033, firstEp);
server4.Start("/tmp/mnode_test_mnode4", fqdn, 9034, firstEp);
server5.Start("/tmp/mnode_test_mnode5", fqdn, 9035, firstEp);
taosMsleep(300); taosMsleep(300);
} }
static void TearDownTestSuite() { static void TearDownTestSuite() {
server2.Stop(); server2.Stop();
server3.Stop();
server4.Stop();
server5.Stop();
test.Cleanup(); test.Cleanup();
} }
static Testbase test; static Testbase test;
static TestServer server2; static TestServer server2;
static TestServer server3;
static TestServer server4;
static TestServer server5;
}; };
Testbase MndTestMnode::test; Testbase MndTestMnode::test;
TestServer MndTestMnode::server2; TestServer MndTestMnode::server2;
TestServer MndTestMnode::server3;
TestServer MndTestMnode::server4;
TestServer MndTestMnode::server5;
TEST_F(MndTestMnode, 01_ShowDnode) { TEST_F(MndTestMnode, 01_ShowDnode) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_MNODE, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_MNODE, "");
...@@ -64,7 +52,7 @@ TEST_F(MndTestMnode, 01_ShowDnode) { ...@@ -64,7 +52,7 @@ TEST_F(MndTestMnode, 01_ShowDnode) {
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1); CheckInt16(1);
CheckBinary("localhost:9031", TSDB_EP_LEN); CheckBinary("localhost:9028", TSDB_EP_LEN);
CheckBinary("master", 12); CheckBinary("master", 12);
CheckInt64(0); CheckInt64(0);
CheckTimestamp(); CheckTimestamp();
...@@ -103,7 +91,7 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { ...@@ -103,7 +91,7 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost"); strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9032); pReq->port = htonl(9029);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -132,8 +120,8 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { ...@@ -132,8 +120,8 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
CheckInt16(1); CheckInt16(1);
CheckInt16(2); CheckInt16(2);
CheckBinary("localhost:9031", TSDB_EP_LEN); CheckBinary("localhost:9028", TSDB_EP_LEN);
CheckBinary("localhost:9032", TSDB_EP_LEN); CheckBinary("localhost:9029", TSDB_EP_LEN);
CheckBinary("master", 12); CheckBinary("master", 12);
CheckBinary("slave", 12); CheckBinary("slave", 12);
CheckInt64(0); CheckInt64(0);
...@@ -158,144 +146,145 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { ...@@ -158,144 +146,145 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1); CheckInt16(1);
CheckBinary("localhost:9031", TSDB_EP_LEN); CheckBinary("localhost:9028", TSDB_EP_LEN);
CheckBinary("master", 12); CheckBinary("master", 12);
CheckInt64(0); CheckInt64(0);
CheckTimestamp(); CheckTimestamp();
} }
{
// drop mnode
int32_t contLen = sizeof(SMDropMnodeReq);
SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MNODE_NOT_EXIST);
}
}
TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
{
// continue send message, mnode is creating
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_CREATING);
}
{
// continue send message, mnode is creating
int32_t contLen = sizeof(SMDropMnodeReq);
SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_CREATING);
}
{
// server start, wait until the rollback finished
server2.DoStart();
taosMsleep(1000);
int32_t retry = 0;
int32_t retryMax = 20;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
if (pRsp->code == TSDB_CODE_MND_MNODE_ALREADY_EXIST) break;
taosMsleep(1000);
}
ASSERT_NE(retry, retryMax);
}
} }
// {
// int32_t contLen = sizeof(SDropDnodeReq); TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) {
{
// SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen); // send message first, then dnode2 crash, result is returned, and rollback is started
// pReq->dnodeId = htonl(2); int32_t contLen = sizeof(SMDropMnodeReq);
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen); SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
// ASSERT_NE(pRsp, nullptr); pReq->dnodeId = htonl(2);
// ASSERT_EQ(pRsp->code, 0);
// } server2.Stop();
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, ""); ASSERT_NE(pRsp, nullptr);
// CHECK_META("show dnodes", 7); ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
// test.SendShowRetrieveReq(); }
// EXPECT_EQ(test.GetShowRows(), 1);
{
// CheckInt16(1); // continue send message, mnode is dropping
// CheckBinary("localhost:9031", TSDB_EP_LEN); int32_t contLen = sizeof(SMCreateMnodeReq);
// CheckInt16(0);
// CheckInt16(1); SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
// CheckBinary("ready", 10); pReq->dnodeId = htonl(2);
// CheckTimestamp();
// CheckBinary("", 24); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
// { ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_DROPPING);
// int32_t contLen = sizeof(SCreateDnodeReq); }
// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); {
// strcpy(pReq->ep, "localhost:9033"); // continue send message, mnode is dropping
int32_t contLen = sizeof(SMDropMnodeReq);
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr); SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
// ASSERT_EQ(pRsp->code, 0); pReq->dnodeId = htonl(2);
// }
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
// { ASSERT_NE(pRsp, nullptr);
// int32_t contLen = sizeof(SCreateDnodeReq); ASSERT_EQ(pRsp->code, TSDB_CODE_SDB_OBJ_DROPPING);
}
// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9034"); {
// server start, wait until the rollback finished
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); server2.DoStart();
// ASSERT_NE(pRsp, nullptr); taosMsleep(1000);
// ASSERT_EQ(pRsp->code, 0);
// } int32_t retry = 0;
int32_t retryMax = 20;
// {
// int32_t contLen = sizeof(SCreateDnodeReq); for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateMnodeReq);
// SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9035"); SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
// ASSERT_EQ(pRsp->code, 0); ASSERT_NE(pRsp, nullptr);
// } if (pRsp->code == 0) break;
taosMsleep(1000);
// taosMsleep(1300); }
// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
// CHECK_META("show dnodes", 7); ASSERT_NE(retry, retryMax);
// test.SendShowRetrieveReq(); }
// EXPECT_EQ(test.GetShowRows(), 4); }
\ No newline at end of file
// CheckInt16(1);
// CheckInt16(3);
// CheckInt16(4);
// CheckInt16(5);
// CheckBinary("localhost:9031", TSDB_EP_LEN);
// CheckBinary("localhost:9033", TSDB_EP_LEN);
// CheckBinary("localhost:9034", TSDB_EP_LEN);
// CheckBinary("localhost:9035", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// // restart
// uInfo("stop all server");
// test.Restart();
// server2.Restart();
// server3.Restart();
// server4.Restart();
// server5.Restart();
// taosMsleep(1300);
// test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
// CHECK_META("show dnodes", 7);
// test.SendShowRetrieveReq();
// EXPECT_EQ(test.GetShowRows(), 4);
// CheckInt16(1);
// CheckInt16(3);
// CheckInt16(4);
// CheckInt16(5);
// CheckBinary("localhost:9031", TSDB_EP_LEN);
// CheckBinary("localhost:9033", TSDB_EP_LEN);
// CheckBinary("localhost:9034", TSDB_EP_LEN);
// CheckBinary("localhost:9035", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// }
...@@ -98,7 +98,7 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf); ...@@ -98,7 +98,7 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
* @param msgBufLen * @param msgBufLen
* @return * @return
*/ */
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen); int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseBasicCtx *pCtx, char* msg, int32_t msgBufLen);
/** /**
* Destroy the meta data request structure. * Destroy the meta data request structure.
......
...@@ -213,7 +213,7 @@ SQueryStmtInfo *createQueryInfo() { ...@@ -213,7 +213,7 @@ SQueryStmtInfo *createQueryInfo() {
pQueryInfo->slimit.limit = -1; pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0; pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->window = TSWINDOW_INITIALIZER; pQueryInfo->window = TSWINDOW_INITIALIZER;
pQueryInfo->exprList = calloc(10, POINTER_BYTES); pQueryInfo->exprList = calloc(10, POINTER_BYTES);
...@@ -247,8 +247,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) { ...@@ -247,8 +247,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) {
tfree(pQueryInfo->fillVal); tfree(pQueryInfo->fillVal);
tfree(pQueryInfo->buf); tfree(pQueryInfo->buf);
taosArrayDestroy(pQueryInfo->pUpstream); taosArrayDestroy(pQueryInfo->pDownstream);
pQueryInfo->pUpstream = NULL; pQueryInfo->pDownstream = NULL;
pQueryInfo->bufLen = 0; pQueryInfo->bufLen = 0;
} }
...@@ -256,9 +256,9 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) { ...@@ -256,9 +256,9 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) {
while (pQueryInfo != NULL) { while (pQueryInfo != NULL) {
SQueryStmtInfo* p = pQueryInfo->sibling; SQueryStmtInfo* p = pQueryInfo->sibling;
size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pUpstream); size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pDownstream);
for (int32_t i = 0; i < numOfUpstream; ++i) { for (int32_t i = 0; i < numOfUpstream; ++i) {
SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i); SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pDownstream, i);
destroyQueryInfoImpl(pUpQueryInfo); destroyQueryInfoImpl(pUpQueryInfo);
clearAllTableMetaInfo(pUpQueryInfo, false, 0); clearAllTableMetaInfo(pUpQueryInfo, false, 0);
tfree(pUpQueryInfo); tfree(pUpQueryInfo);
...@@ -288,7 +288,6 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI ...@@ -288,7 +288,6 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI
} }
pSub->pUdfInfo = pUdfInfo; pSub->pUdfInfo = pUdfInfo;
pSub->pDownstream = pQueryInfo;
int32_t code = validateSqlNode(p, pSub, pMsgBuf); int32_t code = validateSqlNode(p, pSub, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -311,7 +310,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI ...@@ -311,7 +310,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI
tstrncpy(pTableMetaInfo1->aliasName, subInfo->aliasName.z, subInfo->aliasName.n + 1); tstrncpy(pTableMetaInfo1->aliasName, subInfo->aliasName.z, subInfo->aliasName.n + 1);
} }
taosArrayPush(pQueryInfo->pUpstream, &pSub); taosArrayPush(pQueryInfo->pDownstream, &pSub);
// NOTE: order mix up in subquery not support yet. // NOTE: order mix up in subquery not support yet.
pQueryInfo->order = pSub->order; pQueryInfo->order = pSub->order;
...@@ -600,7 +599,7 @@ int32_t checkForUnsupportedQuery(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) { ...@@ -600,7 +599,7 @@ int32_t checkForUnsupportedQuery(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
} }
if (f == FUNCTION_BLKINFO && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { if (f == FUNCTION_BLKINFO && taosArrayGetSize(pQueryInfo->pDownstream) > 0) {
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
} }
...@@ -1584,7 +1583,6 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* ...@@ -1584,7 +1583,6 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf*
} }
pushDownAggFuncExprInfo(pQueryInfo); pushDownAggFuncExprInfo(pQueryInfo);
// addColumnNodeFromLowerLevel(pQueryInfo);
for(int32_t i = 0; i < 1; ++i) { for(int32_t i = 0; i < 1; ++i) {
SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]); SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]);
...@@ -3904,17 +3902,30 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt ...@@ -3904,17 +3902,30 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
// TODO: check if the qnode info has been cached already // TODO: check if the qnode info has been cached already
req.qNodeRequired = true; req.qNodeRequired = true;
code = qParserExtractRequestedMetaInfo(pInfo, &req, msgBuf, msgBufLen); code = qParserExtractRequestedMetaInfo(pInfo, &req, pCtx, msgBuf, msgBufLen);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
// load the meta data from catalog // load the meta data from catalog
code = catalogGetAllMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &req, &data); // code = catalogGetAllMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &req, &data);
STableMeta* pmt = NULL;
SName* name = taosArrayGet(req.pTableName, 0);
code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &pmt);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
data.pTableMeta = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(data.pTableMeta, &pmt);
pQueryInfo->pTableMetaInfo = calloc(1, POINTER_BYTES);
pQueryInfo->pTableMetaInfo[0] = calloc(1, sizeof(STableMetaInfo));
pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt;
pQueryInfo->pTableMetaInfo[0]->name = *name;
pQueryInfo->numOfTables = 1;
// evaluate the sqlnode // evaluate the sqlnode
STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0); STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0);
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
......
...@@ -194,6 +194,18 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { ...@@ -194,6 +194,18 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
} }
val = htonl(pCreate->maxRows);
if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) {
snprintf(msg, tListLen(msg), "invalid number of max rows in file block for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK);
}
val = htonl(pCreate->minRows);
if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) {
snprintf(msg, tListLen(msg), "invalid number of min rows in file block for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -61,7 +61,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { ...@@ -61,7 +61,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
pDcl->nodeType = info.type; pDcl->nodeType = info.type;
} }
} else { } else {
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); SQueryStmtInfo* pQueryInfo = createQueryInfo();
if (pQueryInfo == NULL) { if (pQueryInfo == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code. terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code.
return terrno; return terrno;
...@@ -89,7 +89,7 @@ int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) { ...@@ -89,7 +89,7 @@ int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) {
return 0; return 0;
} }
static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf); static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
static int32_t tnameComparFn(const void* p1, const void* p2) { static int32_t tnameComparFn(const void* p1, const void* p2) {
SName* pn1 = (SName*)p1; SName* pn1 = (SName*)p1;
...@@ -113,7 +113,7 @@ static int32_t tnameComparFn(const void* p1, const void* p2) { ...@@ -113,7 +113,7 @@ static int32_t tnameComparFn(const void* p1, const void* p2) {
} }
} }
static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf) { static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) {
int32_t numOfSub = (int32_t)taosArrayGetSize(pSqlNode->from->list); int32_t numOfSub = (int32_t)taosArrayGetSize(pSqlNode->from->list);
for (int32_t j = 0; j < numOfSub; ++j) { for (int32_t j = 0; j < numOfSub; ++j) {
...@@ -123,12 +123,12 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis ...@@ -123,12 +123,12 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SSqlNode* p = taosArrayGetP(sub->pSubquery->node, i); SSqlNode* p = taosArrayGetP(sub->pSubquery->node, i);
if (p->from->type == SQL_FROM_NODE_TABLES) { if (p->from->type == SQL_FROM_NODE_TABLES) {
int32_t code = getTableNameFromSqlNode(p, tableNameList, pMsgBuf); int32_t code = getTableNameFromSqlNode(p, tableNameList, pCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} else { } else {
getTableNameFromSubquery(p, tableNameList, pMsgBuf); getTableNameFromSubquery(p, tableNameList, pCtx, pMsgBuf);
} }
} }
} }
...@@ -136,7 +136,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis ...@@ -136,7 +136,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf) { int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pParseCtx, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid table name"; const char* msg1 = "invalid table name";
int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list); int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list);
...@@ -155,7 +155,11 @@ int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgB ...@@ -155,7 +155,11 @@ int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgB
} }
SName name = {0}; SName name = {0};
strndequote(name.tname, t->z, t->n); int32_t code = createSName(&name, t, pParseCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
taosArrayPush(tableNameList, &name); taosArrayPush(tableNameList, &name);
} }
...@@ -166,7 +170,7 @@ static void freePtrElem(void* p) { ...@@ -166,7 +170,7 @@ static void freePtrElem(void* p) {
tfree(*(char**)p); tfree(*(char**)p);
} }
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen) { int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseBasicCtx *pCtx, char* msg, int32_t msgBufLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen}; SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen};
...@@ -182,12 +186,12 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* p ...@@ -182,12 +186,12 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* p
// load the table meta in the FROM clause // load the table meta in the FROM clause
if (pSqlNode->from->type == SQL_FROM_NODE_TABLES) { if (pSqlNode->from->type == SQL_FROM_NODE_TABLES) {
code = getTableNameFromSqlNode(pSqlNode, pMetaInfo->pTableName, &msgBuf); code = getTableNameFromSqlNode(pSqlNode, pMetaInfo->pTableName, pCtx, &msgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} else { } else {
code = getTableNameFromSubquery(pSqlNode, pMetaInfo->pTableName, &msgBuf); code = getTableNameFromSubquery(pSqlNode, pMetaInfo->pTableName, pCtx, &msgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -77,12 +77,15 @@ void sqlCheck(const char* sql, bool valid) { ...@@ -77,12 +77,15 @@ void sqlCheck(const char* sql, bool valid) {
buf.len = 128; buf.len = 128;
buf.buf = msg; buf.buf = msg;
SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
SSqlNode* pNode = (SSqlNode*)taosArrayGetP(((SArray*)info1.sub.node), 0); SSqlNode* pNode = (SSqlNode*)taosArrayGetP(((SArray*)info1.sub.node), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -119,7 +122,11 @@ TEST(testCase, validateAST_test) { ...@@ -119,7 +122,11 @@ TEST(testCase, validateAST_test) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -177,7 +184,11 @@ TEST(testCase, function_Test) { ...@@ -177,7 +184,11 @@ TEST(testCase, function_Test) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -223,7 +234,11 @@ TEST(testCase, function_Test2) { ...@@ -223,7 +234,11 @@ TEST(testCase, function_Test2) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -269,7 +284,11 @@ TEST(testCase, function_Test3) { ...@@ -269,7 +284,11 @@ TEST(testCase, function_Test3) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -314,7 +333,11 @@ TEST(testCase, function_Test4) { ...@@ -314,7 +333,11 @@ TEST(testCase, function_Test4) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -362,7 +385,11 @@ TEST(testCase, function_Test5) { ...@@ -362,7 +385,11 @@ TEST(testCase, function_Test5) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -447,7 +474,11 @@ TEST(testCase, function_Test6) { ...@@ -447,7 +474,11 @@ TEST(testCase, function_Test6) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -525,7 +556,11 @@ TEST(testCase, function_Test6) { ...@@ -525,7 +556,11 @@ TEST(testCase, function_Test6) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -587,7 +622,11 @@ TEST(testCase, function_Test6) { ...@@ -587,7 +622,11 @@ TEST(testCase, function_Test6) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -636,7 +675,7 @@ TEST(testCase, function_Test6) { ...@@ -636,7 +675,7 @@ TEST(testCase, function_Test6) {
code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -666,7 +705,10 @@ TEST(testCase, function_Test6) { ...@@ -666,7 +705,10 @@ TEST(testCase, function_Test6) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -688,7 +730,7 @@ TEST(testCase, function_Test6) { ...@@ -688,7 +730,7 @@ TEST(testCase, function_Test6) {
code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
......
...@@ -81,7 +81,8 @@ void generateLogicplan(const char* sql) { ...@@ -81,7 +81,8 @@ void generateLogicplan(const char* sql) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
...@@ -121,7 +122,9 @@ TEST(testCase, planner_test) { ...@@ -121,7 +122,9 @@ TEST(testCase, planner_test) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); SParseBasicCtx ctx = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
......
...@@ -710,7 +710,11 @@ TEST(testCase, extractMeta_test) { ...@@ -710,7 +710,11 @@ TEST(testCase, extractMeta_test) {
char msg[128] = {0}; char msg[128] = {0};
SCatalogReq req = {0}; SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
SParseBasicCtx ctx = {0};
ctx.db = "db1";
ctx.acctId = 1;
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
......
...@@ -53,8 +53,8 @@ typedef struct SQueryDistPlanNodeInfo { ...@@ -53,8 +53,8 @@ typedef struct SQueryDistPlanNodeInfo {
typedef struct SQueryTableInfo { typedef struct SQueryTableInfo {
char *tableName; // to be deleted char *tableName; // to be deleted
uint64_t uid; // to be deleted uint64_t uid; // to be deleted
STableMetaInfo* pMeta; STableMetaInfo *pMeta;
STimeWindow window; STimeWindow window;
} SQueryTableInfo; } SQueryTableInfo;
typedef struct SQueryPlanNode { typedef struct SQueryPlanNode {
......
...@@ -64,10 +64,11 @@ static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode* ...@@ -64,10 +64,11 @@ static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode*
} }
int32_t createSelectPlan(const SQueryStmtInfo* pSelect, SQueryPlanNode** pQueryPlan) { int32_t createSelectPlan(const SQueryStmtInfo* pSelect, SQueryPlanNode** pQueryPlan) {
SArray* upstream = createQueryPlanImpl(pSelect); SArray* pDownstream = createQueryPlanImpl(pSelect);
assert(taosArrayGetSize(upstream) == 1); assert(taosArrayGetSize(pDownstream) == 1);
*pQueryPlan = taosArrayGetP(upstream, 0);
taosArrayDestroy(upstream); *pQueryPlan = taosArrayGetP(pDownstream, 0);
taosArrayDestroy(pDownstream);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -100,23 +101,21 @@ void destroyQueryPlan(SQueryPlanNode* pQueryNode) { ...@@ -100,23 +101,21 @@ void destroyQueryPlan(SQueryPlanNode* pQueryNode) {
//====================================================================================================================== //======================================================================================================================
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** pChildrenNode, int32_t numOfChildren,
SExprInfo** pExpr, int32_t numOfOutput, const void* pExtInfo) { SExprInfo** pExpr, int32_t numOfOutput, const void* pExtInfo) {
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode)); SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
pNode->info.type = type; pNode->info.type = type;
pNode->info.name = strdup(name); pNode->info.name = strdup(name);
pNode->numOfExpr = numOfOutput; pNode->numOfExpr = numOfOutput;
pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES);
for(int32_t i = 0; i < numOfOutput; ++i) { pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES);
taosArrayPush(pNode->pExpr, &pExpr[i]); taosArrayAddBatch(pNode->pExpr, pExpr, numOfOutput);
} assert(pNode->numOfExpr == numOfOutput);
pNode->pChildren = taosArrayInit(4, POINTER_BYTES); pNode->pChildren = taosArrayInit(4, POINTER_BYTES);
for(int32_t i = 0; i < numOfPrev; ++i) { for(int32_t i = 0; i < numOfChildren; ++i) {
taosArrayPush(pNode->pChildren, &prev[i]); taosArrayPush(pNode->pChildren, &pChildrenNode[i]);
} }
switch(type) { switch(type) {
...@@ -184,8 +183,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla ...@@ -184,8 +183,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
return pNode; return pNode;
} }
static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info, static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQueryTableInfo* info, SArray* pExprs, SArray* tableCols) {
SArray* pExprs, SArray* tableCols) {
if (pQueryInfo->info.onlyTagQuery) { if (pQueryInfo->info.onlyTagQuery) {
int32_t num = (int32_t) taosArrayGetSize(pExprs); int32_t num = (int32_t) taosArrayGetSize(pExprs);
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info); SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info);
...@@ -193,16 +191,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, ST ...@@ -193,16 +191,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, ST
if (pQueryInfo->info.distinct) { if (pQueryInfo->info.distinct) {
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
} }
return pNode; return pNode;
} }
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
if (pQueryInfo->info.projectionQuery) { if (!pQueryInfo->info.projectionQuery) {
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL);
} else {
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
// table source column projection, generate the projection expr // table source column projection, generate the projection expr
...@@ -262,7 +256,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* ...@@ -262,7 +256,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL); pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL);
} }
} else { } else {
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); // here we can push down the projection to tablescan operator.
pNode->numOfExpr = num;
pNode->pExpr = taosArrayInit(num, POINTER_BYTES);
taosArrayAddAll(pNode->pExpr, p);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
} }
} }
...@@ -299,9 +297,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTable(const SQueryStmtInfo* pQu ...@@ -299,9 +297,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTable(const SQueryStmtInfo* pQu
tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN); tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN);
SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,}; SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,};
info.window = pQueryInfo->window;
info.pMeta = pTableMetaInfo;
// handle the only tag query // handle the only tag query
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols); SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, pExprs, tableCols);
if (pQueryInfo->info.onlyTagQuery) { if (pQueryInfo->info.onlyTagQuery) {
tfree(info.tableName); tfree(info.tableName);
return pNode; return pNode;
...@@ -326,23 +326,23 @@ static bool isAllAggExpr(SArray* pList) { ...@@ -326,23 +326,23 @@ static bool isAllAggExpr(SArray* pList) {
} }
SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
SArray* upstream = NULL; SArray* pDownstream = NULL;
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { // subquery in the from clause
upstream = taosArrayInit(4, POINTER_BYTES); pDownstream = taosArrayInit(4, POINTER_BYTES);
size_t size = taosArrayGetSize(pQueryInfo->pUpstream); size_t size = taosArrayGetSize(pQueryInfo->pDownstream);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i); SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pDownstream, i);
SArray* p = createQueryPlanImpl(pq); SArray* p = createQueryPlanImpl(pq);
taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); taosArrayAddBatch(pDownstream, p->pData, (int32_t) taosArrayGetSize(p));
} }
} }
if (pQueryInfo->numOfTables > 1) { // it is a join query if (pQueryInfo->numOfTables > 1) { // it is a join query
// 1. separate the select clause according to table // 1. separate the select clause according to table
taosArrayDestroy(upstream); taosArrayDestroy(pDownstream);
upstream = taosArrayInit(5, POINTER_BYTES); pDownstream = taosArrayInit(5, POINTER_BYTES);
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i]; STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i];
...@@ -365,30 +365,30 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { ...@@ -365,30 +365,30 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
columnListCopy(tableColumnList, pQueryInfo->colList, uid); columnListCopy(tableColumnList, pQueryInfo->colList, uid);
// 4. add the projection query node // 4. add the projection query node
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList); SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, exprList, tableColumnList);
columnListDestroy(tableColumnList); columnListDestroy(tableColumnList);
// dropAllExprInfo(exprList); // dropAllExprInfo(exprList);
taosArrayPush(upstream, &pNode); taosArrayPush(pDownstream, &pNode);
} }
// 3. add the join node here // 3. add the join node here
SQueryTableInfo info = {0}; SQueryTableInfo info = {0};
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]); int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]);
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables, SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", pDownstream->pData, pQueryInfo->numOfTables,
pQueryInfo->exprList[0]->pData, num, NULL); pQueryInfo->exprList[0]->pData, num, NULL);
// 4. add the aggregation or projection execution node // 4. add the aggregation or projection execution node
pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info); pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
upstream = taosArrayInit(5, POINTER_BYTES); pDownstream = taosArrayInit(5, POINTER_BYTES);
taosArrayPush(upstream, &pNode); taosArrayPush(pDownstream, &pNode);
} else { // only one table, normal query process } else { // only one table, normal query process
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList); SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList);
upstream = taosArrayInit(5, POINTER_BYTES); pDownstream = taosArrayInit(5, POINTER_BYTES);
taosArrayPush(upstream, &pNode); taosArrayPush(pDownstream, &pNode);
} }
return upstream; return pDownstream;
} }
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
...@@ -434,22 +434,23 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, ...@@ -434,22 +434,23 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
switch(pQueryNode->info.type) { switch(pQueryNode->info.type) {
case QNODE_TABLESCAN: { case QNODE_TABLESCAN: {
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid, len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid);
pInfo->window.skey, pInfo->window.ekey);
assert(len1 > 0); assert(len1 > 0);
len += len1; len += len1;
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { len1 = sprintf(buf + len, " , cols:");
SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i); assert(len1 > 0);
len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId); len += len1;
assert(len1 > 0);
len += len1;
}
len1 = sprintf(buf + len, "\n"); len = printExprInfo(buf, pQueryNode, len);
len1 = sprintf(buf + len, ")");
assert(len1 > 0); assert(len1 > 0);
// todo print filter info
len1 = sprintf(buf + len, ") filters:(nil)");
len += len1;
len1 = sprintf(buf + len, " time_range: %" PRId64 " - %" PRId64"\n", pInfo->window.skey, pInfo->window.ekey);
len += len1; len += len1;
break; break;
} }
......
...@@ -187,7 +187,8 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { ...@@ -187,7 +187,8 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan))); SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
subplan->id = pCxt->nextId; subplan->id = pCxt->nextId;
++(pCxt->nextId.subplanId); ++(pCxt->nextId.subplanId);
subplan->type = type;
subplan->type = type;
subplan->level = 0; subplan->level = 0;
if (NULL != pCxt->pCurrentSubplan) { if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1; subplan->level = pCxt->pCurrentSubplan->level + 1;
...@@ -275,6 +276,8 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { ...@@ -275,6 +276,8 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN: case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode); node = createTableScanNode(pCxt, pPlanNode);
break; break;
case QNODE_PROJECT:
// node = create
case QNODE_MODIFY: case QNODE_MODIFY:
// Insert is not an operator in a physical plan. // Insert is not an operator in a physical plan.
break; break;
...@@ -335,7 +338,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD ...@@ -335,7 +338,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
.pCatalog = pCatalog, .pCatalog = pCatalog,
.pDag = validPointer(calloc(1, sizeof(SQueryDag))), .pDag = validPointer(calloc(1, sizeof(SQueryDag))),
.pCurrentSubplan = NULL, .pCurrentSubplan = NULL,
.nextId = {0} // todo queryid .nextId = {.queryId = requestId},
}; };
*pDag = context.pDag; *pDag = context.pDag;
......
...@@ -230,9 +230,11 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) { ...@@ -230,9 +230,11 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) {
if (res) { if (res) {
res = cJSON_AddNumberToObject(jCol, jkColumnInfoBytes, col->bytes); res = cJSON_AddNumberToObject(jCol, jkColumnInfoBytes, col->bytes);
} }
if (res) {
res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters); if (res) { // TODO: temporarily disable it
// res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters);
} }
return res; return res;
} }
...@@ -794,7 +796,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) { ...@@ -794,7 +796,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
} }
// The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized. // The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized.
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id); bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
if (res) { if (res) {
res = addObject(jSubplan, jkSubplanNode, phyNodeToJson, subplan->pNode); res = addObject(jSubplan, jkSubplanNode, phyNodeToJson, subplan->pNode);
...@@ -807,6 +808,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) { ...@@ -807,6 +808,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
cJSON_Delete(jSubplan); cJSON_Delete(jSubplan);
return NULL; return NULL;
} }
return jSubplan; return jSubplan;
} }
......
...@@ -64,6 +64,13 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, ...@@ -64,6 +64,13 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
return code; return code;
} }
//
if (logicPlan->info.type != QNODE_MODIFY) {
// char* str = NULL;
// queryPlanToString(logicPlan, &str);
// printf("%s\n", str);
}
code = optimizeQueryPlan(logicPlan); code = optimizeQueryPlan(logicPlan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan); destroyQueryPlan(logicPlan);
......
...@@ -88,7 +88,6 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { ...@@ -88,7 +88,6 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -248,19 +247,20 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { ...@@ -248,19 +247,20 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
} }
int32_t addNum = 0; int32_t addNum = 0;
int32_t nodeNum = taosArrayGetSize(job->nodeList);
if (job->nodeList) {
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { int32_t nodeNum = (int32_t) taosArrayGetSize(job->nodeList);
SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i); for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
qError("taosArrayPush failed"); if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
++addNum;
} }
++addNum;
} }
/* /*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
...@@ -279,8 +279,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { ...@@ -279,8 +279,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), pJob->queryId);
pJob->queryId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -997,7 +996,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void ...@@ -997,7 +996,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
} }
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) { int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) {
if (NULL == transport || NULL == nodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == transport || /*NULL == nodeList || */NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册