diff --git a/include/common/tep.h b/include/common/tep.h index c8f45e4c82d94c0b7177777cc227b98c2c1414a0..69dd385a37e8d9e7b3e90e20bd21bce1f224186b 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -1,6 +1,10 @@ #ifndef TDENGINE_TEP_H #define TDENGINE_TEP_H +#ifdef __cplusplus +extern "C" { +#endif + #include "os.h" #include "tmsg.h" @@ -9,10 +13,16 @@ typedef struct SCorEpSet { SEpSet epSet; } SCorEpSet; -int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port); +int taosGetFqdnPortFromEp(const char *ep, SEp *pEp); +void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port); + bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2); -void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet); +void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet); SEpSet getEpSet_s(SCorEpSet *pEpSet); +#ifdef __cplusplus +} +#endif + #endif // TDENGINE_TEP_H diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b3d595d73b57e6da426575dc97a67b711fea72af..817e59f28a59a7658d8c44b8275222d1b396e5d6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -154,10 +154,10 @@ typedef struct { #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta -typedef struct { +typedef struct SEp { char fqdn[TSDB_FQDN_LEN]; uint16_t port; -} SEpAddr; +} SEp; typedef struct { int32_t contLen; @@ -266,8 +266,7 @@ typedef struct { typedef struct SEpSet { int8_t inUse; int8_t numOfEps; - uint16_t port[TSDB_MAX_REPLICA]; - char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; + SEp eps[TSDB_MAX_REPLICA]; } SEpSet; static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { @@ -275,8 +274,8 @@ static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { tlen += taosEncodeFixedI8(buf, pEp->inUse); tlen += taosEncodeFixedI8(buf, pEp->numOfEps); for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - tlen += taosEncodeFixedU16(buf, pEp->port[i]); - tlen += taosEncodeString(buf, pEp->fqdn[i]); + tlen += taosEncodeFixedU16(buf, pEp->eps[i].port); + tlen += taosEncodeString(buf, pEp->eps[i].fqdn); } return tlen; } @@ -285,8 +284,8 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { buf = taosDecodeFixedI8(buf, &pEp->inUse); buf = taosDecodeFixedI8(buf, &pEp->numOfEps); for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - buf = taosDecodeFixedU16(buf, &pEp->port[i]); - buf = taosDecodeStringTo(buf, pEp->fqdn[i]); + buf = taosDecodeFixedU16(buf, &pEp->eps[i].port); + buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn); } return buf; } @@ -617,8 +616,7 @@ typedef struct { int32_t id; int8_t isMnode; int8_t align; - uint16_t port; - char fqdn[TSDB_FQDN_LEN]; + SEp ep; } SDnodeEp; typedef struct { @@ -691,24 +689,17 @@ typedef struct { char tableNames[]; } SMultiTableInfoReq; +// todo refactor typedef struct SVgroupInfo { int32_t vgId; uint32_t hashBegin; uint32_t hashEnd; - int8_t inUse; - int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; + SEpSet epset; } SVgroupInfo; typedef struct { - int32_t vgId; - int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; -} SVgroupMsg; - -typedef struct { - int32_t numOfVgroups; - SVgroupMsg vgroups[]; + int32_t numOfVgroups; + SVgroupInfo vgroups[]; } SVgroupsInfo; typedef struct { diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1925f0e3bda380b375b065b183ccce53cc2f994b..6cd33b1067438ddb812a417f9acd2e1e28b345af 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -128,20 +128,9 @@ typedef struct SMsgSendInfo { typedef struct SQueryNodeAddr { int32_t nodeId; // vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; + SEpSet epset; } SQueryNodeAddr; -static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) { - pEpSet->inUse = pAddr->inUse; - pEpSet->numOfEps = pAddr->numOfEps; - for (int j = 0; j < TSDB_MAX_REPLICA; j++) { - pEpSet->port[j] = pAddr->epAddr[j].port; - memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN); - } -} - int32_t initTaskQueue(); int32_t cleanupTaskQueue(); diff --git a/include/libs/transport/transport.h b/include/libs/transport/transport.h index a05a76931a03803e88ce9613550620aaab203d47..f5ffc125eaccfcd6431d29cc379b04ed88657d4d 100644 --- a/include/libs/transport/transport.h +++ b/include/libs/transport/transport.h @@ -20,21 +20,6 @@ extern "C" { #endif -//typedef struct SEpAddr { -// char fqdn[TSDB_FQDN_LEN]; -// uint16_t port; -//} SEpAddr; -// -//typedef struct SVgroup { -// int32_t vgId; -// int8_t numOfEps; -// SEpAddr epAddr[TSDB_MAX_REPLICA]; -//} SVgroup; -// -//typedef struct SVgroupsInfo { -// int32_t numOfVgroups; -// SVgroup vgroups[]; -//} SVgroupsInfo; #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 179e46527f861185cbb48f5cff11f26db9f78178..37583ed3fe61d30ad712132a6f09e82f0b3dc1d8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -101,7 +101,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, } if (port) { - epSet.epSet.port[0] = port; + epSet.epSet.eps[0].port = port; } } else { if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) { @@ -701,7 +701,7 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe return -1; } - taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0])); + taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]); mgmtEpSet->numOfEps++; } @@ -711,7 +711,7 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe return -1; } - taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps])); + taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); mgmtEpSet->numOfEps++; } @@ -916,14 +916,7 @@ void* doFetchRow(SRequestObj* pRequest) { SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); - epSet.numOfEps = pVgroupInfo->numOfEps; - epSet.inUse = pVgroupInfo->inUse; - - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i])); - epSet.port[i] = pVgroupInfo->epAddr[i].port; - } - + epSet = pVgroupInfo->epset; } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { pRequest->type = TDMT_VND_SHOW_TABLES; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; @@ -940,14 +933,7 @@ void* doFetchRow(SRequestObj* pRequest) { pRequest->body.requestMsg.pData = pShowReq; SMsgSendInfo* body = buildMsgInfoImpl(pRequest); - - epSet.numOfEps = pVgroupInfo->numOfEps; - epSet.inUse = pVgroupInfo->inUse; - - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i])); - epSet.port[i] = pVgroupInfo->epAddr[i].port; - } + epSet = pVgroupInfo->epset; int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index ec088eb0735824f3348287ec54fd476f00dc7adb..42d40b6b81ab9dd18c29aad898fe3233d92e7740 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -53,7 +53,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { assert(pConnect->epSet.numOfEps > 0); for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { - pConnect->epSet.port[i] = htons(pConnect->epSet.port[i]); + pConnect->epSet.eps[i].port = htons(pConnect->epSet.eps[i].port); } if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) { @@ -61,7 +61,8 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { } for (int i = 0; i < pConnect->epSet.numOfEps; ++i) { - tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.fqdn[i], pConnect->epSet.port[i], pTscObj->id); + tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.eps[i].fqdn, + pConnect->epSet.eps[i].port, pTscObj->id); } pTscObj->connId = pConnect->connId; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index d62d7cb826ee5b916e362bf9d547b98f769f87d1..9f20d6f74b3dedd39a84f080640bd5ba39e8cf4e 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -401,7 +401,16 @@ TEST(testCase, create_multiple_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to create db, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + taos_close(pConn); + return; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); if (taos_errno(pRes) != 0) { printf("failed to use db, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -411,6 +420,13 @@ TEST(testCase, create_multiple_tables) { taos_free_result(pRes); + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create stable tables, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); if (taos_errno(pRes) != 0) { printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); @@ -528,6 +544,25 @@ TEST(testCase, generated_request_id_test) { taosHashCleanup(phash); } +TEST(testCase, insert_test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create into table t_2, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +#if 0 TEST(testCase, create_topic_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -552,25 +587,7 @@ TEST(testCase, create_topic_Test) { taos_close(pConn); } -TEST(testCase, insert_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -#if 0 TEST(testCase, tmq_subscribe_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -597,24 +614,6 @@ TEST(testCase, tmq_commit_TEST) { } #endif -//TEST(testCase, insert_test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} - TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -666,66 +665,66 @@ TEST(testCase, projection_query_tables) { taos_close(pConn); } -//TEST(testCase, projection_query_stables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "select ts from m1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} - -//TEST(testCase, agg_query_tables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use dbv"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table tx using st tags(111111111111111)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create table, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "select count(*) from tu"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} +TEST(testCase, projection_query_stables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "select ts from st1"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, agg_query_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); + if (taos_errno(pRes) != 0) { + printf("failed to create table, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "select count(*) from tu"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} #pragma GCC diagnostic pop diff --git a/source/common/src/tep.c b/source/common/src/tep.c index 9cc99e7f517b9791417fbe743416294d125e29ba..cf38ab8dd915e21c7458f88767c15c47762e5eb4 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -2,32 +2,43 @@ #include "tglobal.h" #include "tlockfree.h" -int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) { - *port = 0; - strcpy(fqdn, ep); +int taosGetFqdnPortFromEp(const char *ep, SEp* pEp) { + pEp->port = 0; + strcpy(pEp->fqdn, ep); - char *temp = strchr(fqdn, ':'); + char *temp = strchr(pEp->fqdn, ':'); if (temp) { *temp = 0; - *port = atoi(temp+1); + pEp->port = atoi(temp+1); } - if (*port == 0) { - *port = tsServerPort; + if (pEp->port == 0) { + pEp->port = tsServerPort; return -1; } return 0; } +void addEpIntoEpSet(SEpSet *pEpSet, const char* fqdn, uint16_t port) { + if (pEpSet == NULL || fqdn == NULL || strlen(fqdn) == 0) { + return; + } + + int32_t index = pEpSet->numOfEps; + tstrncpy(pEpSet->eps[index].fqdn, fqdn, tListLen(pEpSet->eps[index].fqdn)); + pEpSet->eps[index].port = port; + pEpSet->numOfEps += 1; +} + bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2) { if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { return false; } for (int32_t i = 0; i < s1->numOfEps; i++) { - if (s1->port[i] != s2->port[i] - || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0) + if (s1->eps[i].port != s2->eps[i].port + || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0) return false; } return true; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7a0966df57162f60ed58db2026b7d12a8dac4c73..a353c018b77de8edfe8f79819c14b1d0df13d74f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1080,9 +1080,7 @@ static void doInitGlobalConfig(void) { void taosInitGlobalCfg() { pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig); } int32_t taosCheckAndPrintCfg() { - char fqdn[TSDB_FQDN_LEN]; - uint16_t port; - + SEp ep = {0}; if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) { taosSetAllDebugFlag(); } @@ -1097,15 +1095,15 @@ int32_t taosCheckAndPrintCfg() { if (tsFirst[0] == 0) { strcpy(tsFirst, tsLocalEp); } else { - taosGetFqdnPortFromEp(tsFirst, fqdn, &port); - snprintf(tsFirst, sizeof(tsFirst), "%s:%u", fqdn, port); + taosGetFqdnPortFromEp(tsFirst, &ep); + snprintf(tsFirst, sizeof(tsFirst), "%s:%u", ep.fqdn, ep.port); } if (tsSecond[0] == 0) { strcpy(tsSecond, tsLocalEp); } else { - taosGetFqdnPortFromEp(tsSecond, fqdn, &port); - snprintf(tsSecond, sizeof(tsSecond), "%s:%u", fqdn, port); + taosGetFqdnPortFromEp(tsSecond, &ep); + snprintf(tsSecond, sizeof(tsSecond), "%s:%u", ep.fqdn, ep.port); } taosCheckDataDirCfg(); diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index b127fb1d64dc08ac015b54a4f0eb75ebd312b2b3..6a882e87ec895859bc08d22a747bb3ec84360271 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -57,13 +57,13 @@ void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { if (pPort != NULL) { - *pPort = pDnodeEp->port; + *pPort = pDnodeEp->ep.port; } if (pFqdn != NULL) { - tstrncpy(pFqdn, pDnodeEp->fqdn, TSDB_FQDN_LEN); + tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); } if (pEp != NULL) { - snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port); + snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); } } @@ -85,12 +85,12 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) { dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { - dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); - if (strcmp(epSet.fqdn[i], pDnode->cfg.localFqdn) == 0 && epSet.port[i] == pDnode->cfg.serverPort) { + dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); + if (strcmp(epSet.eps[i].fqdn, pDnode->cfg.localFqdn) == 0 && epSet.eps[i].port == pDnode->cfg.serverPort) { epSet.inUse = (i + 1) % epSet.numOfEps; } - epSet.port[i] = htons(epSet.port[i]); + epSet.eps[i].port = htons(epSet.eps[i].port); } rpcSendRedirectRsp(pReq->handle, &epSet); @@ -104,7 +104,7 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { pMgmt->mnodeEpSet = *pEpSet; for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); + dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } taosWUnLockLatch(&pMgmt->latch); @@ -116,7 +116,7 @@ static void dndPrintDnodes(SDnode *pDnode) { dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num); for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i]; - dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->fqdn, pEp->port, pEp->isMnode); + dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode); } } @@ -145,8 +145,8 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) { if (!pDnodeEp->isMnode) continue; if (mIndex >= TSDB_MAX_REPLICA) continue; pMgmt->mnodeEpSet.numOfEps++; - strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn); - pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port; + + pMgmt->mnodeEpSet.eps[mIndex] = pDnodeEp->ep; mIndex++; } @@ -167,7 +167,7 @@ static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) { SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { char epstr[TSDB_EP_LEN + 1]; - snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port); + snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); changed = strcmp(pEp, epstr) != 0; } @@ -251,11 +251,12 @@ static int32_t dndReadDnodes(SDnode *pDnode) { SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; - cJSON *dnodeId = cJSON_GetObjectItem(node, "id"); - if (!dnodeId || dnodeId->type != cJSON_Number) { + cJSON *did = cJSON_GetObjectItem(node, "id"); + if (!did || did->type != cJSON_Number) { dError("failed to read %s since dnodeId not found", pMgmt->file); goto PRASE_DNODE_OVER; } + pDnodeEp->id = dnodeId->valueint; cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); @@ -263,14 +264,15 @@ static int32_t dndReadDnodes(SDnode *pDnode) { dError("failed to read %s since dnodeFqdn not found", pMgmt->file); goto PRASE_DNODE_OVER; } - tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); + tstrncpy(pDnodeEp->ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); if (!dnodePort || dnodePort->type != cJSON_Number) { dError("failed to read %s since dnodePort not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnodeEp->port = dnodePort->valueint; + + pDnodeEp->ep.port = dnodePort->valueint; cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); if (!isMnode || isMnode->type != cJSON_Number) { @@ -298,7 +300,8 @@ PRASE_DNODE_OVER: pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); pMgmt->dnodeEps->num = 1; pMgmt->dnodeEps->eps[0].isMnode = 1; - taosGetFqdnPortFromEp(pDnode->cfg.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port); + + taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &(pMgmt->dnodeEps->eps[0].ep)); } dndResetDnodes(pDnode, pMgmt->dnodeEps); @@ -329,8 +332,8 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->port); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode); if (i < pMgmt->dnodeEps->num - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); @@ -450,7 +453,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { pDnodeEps->num = htonl(pDnodeEps->num); for (int32_t i = 0; i < pDnodeEps->num; ++i) { pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); - pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); + pDnodeEps->eps[i].ep.port = htons(pDnodeEps->eps[i].ep.port); } dndUpdateDnodeEps(pDnode, pDnodeEps); diff --git a/source/dnode/mgmt/impl/test/sut/src/client.cpp b/source/dnode/mgmt/impl/test/sut/src/client.cpp index 3d61db8268b220f78433cb2fe36e9fe5583aa516..8403dbf034f5ba2fe9962f3cf1a6cdd1b6ca6995 100644 --- a/source/dnode/mgmt/impl/test/sut/src/client.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/client.cpp @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tep.h" #include "sut.h" static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { @@ -61,11 +62,7 @@ void TestClient::Cleanup() { SRpcMsg* TestClient::SendReq(SRpcMsg* pReq) { SEpSet epSet = {0}; - epSet.inUse = 0; - epSet.numOfEps = 1; - epSet.port[0] = port; - memcpy(epSet.fqdn[0], fqdn, TSDB_FQDN_LEN); - + addEpIntoEpSet(&epSet, fqdn, port); rpcSendRequest(clientRpc, &epSet, pReq, NULL); tsem_wait(&sem); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 39be41a4e50436e6734aefb32fb3da5861025963..875adc929533e16cec25c7a38a41948f45bf8a50 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -840,18 +840,18 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { pInfo->vgId = htonl(pVgroup->vgId); pInfo->hashBegin = htonl(pVgroup->hashBegin); pInfo->hashEnd = htonl(pVgroup->hashEnd); - pInfo->numOfEps = pVgroup->replica; + pInfo->epset.numOfEps = pVgroup->replica; for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; - SEpAddr *pEpArrr = &pInfo->epAddr[gid]; + SEp * pEp = &pInfo->epset.eps[gid]; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode != NULL) { - memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - pEpArrr->port = htons(pDnode->port); + memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + pEp->port = htons(pDnode->port); } mndReleaseDnode(pMnode, pDnode); if (pVgid->role == TAOS_SYNC_STATE_LEADER) { - pInfo->inUse = gid; + pInfo->epset.inUse = gid; } } vindex++; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index efdacf957b2f80975ae114feb5c9f4a15959cd9f..9a96bdb0dfa4049f75dffa9dd86c881eec863603 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -203,8 +203,8 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) { } SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) { - SEpSet epSet = {.inUse = 0, .numOfEps = 1, .port[0] = pDnode->port}; - memcpy(epSet.fqdn[0], pDnode->fqdn, TSDB_FQDN_LEN); + SEpSet epSet = {0}; + addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port); return epSet; } @@ -261,8 +261,8 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) { SDnodeEp *pEp = &pEps->eps[numOfEps]; pEp->id = htonl(pDnode->id); - pEp->port = htons(pDnode->port); - memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + pEp->ep.port = htons(pDnode->port); + memcpy(pEp->ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN); pEp->isMnode = 0; if (mndIsMnode(pMnode, pDnode->id)) { pEp->isMnode = 1; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 75ed5b0a1ea2fda3ed49d91e8ced97985216880e..fbb4e5cef7e1c669c736ebf9e5437a7cefc1d178 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -237,8 +237,8 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { if (pIter == NULL) break; if (pObj->pDnode == NULL) break; - pEpSet->port[pEpSet->numOfEps] = htons(pObj->pDnode->port); - memcpy(pEpSet->fqdn[pEpSet->numOfEps], pObj->pDnode->fqdn, TSDB_FQDN_LEN); + pEpSet->eps[pEpSet->numOfEps].port = htons(pObj->pDnode->port); + memcpy(pEpSet->eps[pEpSet->numOfEps].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); if (pObj->role == TAOS_SYNC_STATE_LEADER) { pEpSet->inUse = pEpSet->numOfEps; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 78e9a7c17c24ac8610944c6a9db41f54a4e3878b..a72c8c78fc0c3ef288a4ec5ff7fc2dc08a262ab3 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -151,11 +151,13 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas SArray *pArray; SArray* inner = taosArrayGet(pDag->pSubplans, 0); SSubplan *plan = taosArrayGetP(inner, 0); - plan->execNode.inUse = 0; - strcpy(plan->execNode.epAddr[0].fqdn, "localhost"); - plan->execNode.epAddr[0].port = 6030; + plan->execNode.nodeId = 2; - plan->execNode.numOfEps = 1; + SEpSet* pEpSet = &plan->execNode.epset; + + pEpSet->inUse = 0; + pEpSet->numOfEps = 0; + addEpIntoEpSet(pEpSet, "localhost", 6030); if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { return -1; @@ -167,7 +169,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas CEp.status = 0; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; STaskInfo* pTaskInfo = taosArrayGet(pArray, i); - tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); + CEp.epSet = pTaskInfo->addr.epset; /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ CEp.vgId = pTaskInfo->addr.nodeId; CEp.qmsgLen = pTaskInfo->msg->contentLen; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 04fedbb3ced08f3941284d8bf7b7bc8ca3d50e2b..07895c35c88895c707753b7774876526a330a7b1 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -424,9 +424,7 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { epset.inUse = epset.numOfEps; } - epset.port[epset.numOfEps] = pDnode->port; - memcpy(&epset.fqdn[epset.numOfEps], pDnode->fqdn, TSDB_FQDN_LEN); - epset.numOfEps++; + addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port); mndReleaseDnode(pMnode, pDnode); } diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index 5d5947b644ff352e8c5af736220254de650994e9..964a483aac0203cd5c2547f90fac60aca4199aef 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -277,9 +277,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_GT(pInfo->vgId, 0); EXPECT_EQ(pInfo->hashBegin, 0); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1); - EXPECT_EQ(pInfo->inUse, 0); - EXPECT_EQ(pInfo->numOfEps, 1); - SEpAddr* pAddr = &pInfo->epAddr[0]; + EXPECT_EQ(pInfo->epset.inUse, 0); + EXPECT_EQ(pInfo->epset.numOfEps, 1); + SEp* pAddr = &pInfo->epset.eps[0]; pAddr->port = htons(pAddr->port); EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); @@ -293,9 +293,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_GT(pInfo->vgId, 0); EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX); - EXPECT_EQ(pInfo->inUse, 0); - EXPECT_EQ(pInfo->numOfEps, 1); - SEpAddr* pAddr = &pInfo->epAddr[0]; + EXPECT_EQ(pInfo->epset.inUse, 0); + EXPECT_EQ(pInfo->epset.numOfEps, 1); + SEp* pAddr = &pInfo->epset.eps[0]; pAddr->port = htons(pAddr->port); EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 4ad979cdd33a8e4d128737eb8697f47314328c41..b245ab3c30d96ccb5ff2fc3747d40f81b6860cf5 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -44,7 +44,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) { pRsp->acctId = htonl(pRsp->acctId); pRsp->clusterId = htobe64(pRsp->clusterId); pRsp->connId = htonl(pRsp->connId); - pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); + pRsp->epSet.eps[0].port = htons(pRsp->epSet.eps[0].port); EXPECT_EQ(pRsp->acctId, 1); EXPECT_GT(pRsp->clusterId, 0); @@ -53,8 +53,8 @@ TEST_F(MndTestProfile, 01_ConnectMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9031); - EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); + EXPECT_EQ(pRsp->epSet.eps[0].port, 9031); + EXPECT_STREQ(pRsp->epSet.eps[0].fqdn, "localhost"); connId = pRsp->connId; } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2197fdfd62757218070e56c947c8486bd6f471d8..dc056273747c97f330758a36f3be4efe88d469c8 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -65,7 +65,6 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) { char *msg = NULL; - SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; ctgDebug("try to get db vgroup from mnode, db:%s", input->db); @@ -216,17 +215,6 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN return TSDB_CODE_SUCCESS; } - -void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { - epSet->inUse = 0; - epSet->numOfEps = vgroupInfo->numOfEps; - - for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) { - memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i])); - memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i])); - } -} - int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) { SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; char *msg = NULL; @@ -292,7 +280,6 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)tNameGetTableName(pTableName)}; char *msg = NULL; - SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen); @@ -308,10 +295,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, }; SRpcMsg rpcRsp = {0}; - SEpSet epSet; - - ctgGenEpSet(&epSet, vgroupInfo); - rpcSendRecv(pTransporter, &epSet, &rpcMsg, &rpcRsp); + rpcSendRecv(pTransporter, &vgroupInfo->epset, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index a01c3bcf5de3bce63bff4823cfdc182f3b29fb99..a15744a5c72e2b52713153f939c5fdeb0f73af46 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -195,10 +195,10 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) { vgInfo.vgId = i + 1; vgInfo.hashBegin = i * hashUnit; vgInfo.hashEnd = hashUnit * (i + 1) - 1; - vgInfo.numOfEps = i % TSDB_MAX_REPLICA + 1; - vgInfo.inUse = i % vgInfo.numOfEps; - for (int32_t n = 0; n < vgInfo.numOfEps; ++n) { - SEpAddr *addr = &vgInfo.epAddr[n]; + vgInfo.epset.numOfEps = i % TSDB_MAX_REPLICA + 1; + vgInfo.epset.inUse = i % vgInfo.epset.numOfEps; + for (int32_t n = 0; n < vgInfo.epset.numOfEps; ++n) { + SEp *addr = &vgInfo.epset.eps[n]; strcpy(addr->fqdn, "a0"); addr->port = htons(n + 22); } @@ -229,10 +229,10 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM vg->vgId = htonl(i + 1); vg->hashBegin = htonl(i * hashUnit); vg->hashEnd = htonl(hashUnit * (i + 1) - 1); - vg->numOfEps = i % TSDB_MAX_REPLICA + 1; - vg->inUse = i % vg->numOfEps; - for (int32_t n = 0; n < vg->numOfEps; ++n) { - SEpAddr *addr = &vg->epAddr[n]; + vg->epset.numOfEps = i % TSDB_MAX_REPLICA + 1; + vg->epset.inUse = i % vg->epset.numOfEps; + for (int32_t n = 0; n < vg->epset.numOfEps; ++n) { + SEp *addr = &vg->epset.eps[n]; strcpy(addr->fqdn, "a0"); addr->port = htons(n + 22); } @@ -693,7 +693,7 @@ TEST(tableMeta, normalTable) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.numOfEps, 3); + ASSERT_EQ(vgInfo.epset.numOfEps, 3); ctgTestSetPrepareTableMeta(); @@ -983,7 +983,7 @@ TEST(tableDistVgroup, normalTable) { ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(vgInfo->vgId, 8); - ASSERT_EQ(vgInfo->numOfEps, 3); + ASSERT_EQ(vgInfo->epset.numOfEps, 3); catalogDestroy(); } @@ -1015,7 +1015,7 @@ TEST(tableDistVgroup, childTableCase) { ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(vgInfo->vgId, 9); - ASSERT_EQ(vgInfo->numOfEps, 4); + ASSERT_EQ(vgInfo->epset.numOfEps, 4); catalogDestroy(); } @@ -1046,13 +1046,13 @@ TEST(tableDistVgroup, superTableCase) { ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(vgInfo->vgId, 1); - ASSERT_EQ(vgInfo->numOfEps, 1); + ASSERT_EQ(vgInfo->epset.numOfEps, 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 1); ASSERT_EQ(vgInfo->vgId, 2); - ASSERT_EQ(vgInfo->numOfEps, 2); + ASSERT_EQ(vgInfo->epset.numOfEps, 2); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 2); ASSERT_EQ(vgInfo->vgId, 3); - ASSERT_EQ(vgInfo->numOfEps, 3); + ASSERT_EQ(vgInfo->epset.numOfEps, 3); catalogDestroy(); } @@ -1088,14 +1088,14 @@ TEST(dbVgroup, getSetDbVgroupCase) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.numOfEps, 3); + ASSERT_EQ(vgInfo.epset.numOfEps, 3); code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(pvgInfo->vgId, 8); - ASSERT_EQ(pvgInfo->numOfEps, 3); + ASSERT_EQ(pvgInfo->epset.numOfEps, 3); taosArrayDestroy(vgList); ctgTestBuildDBVgroup(&dbVgroup); @@ -1105,14 +1105,14 @@ TEST(dbVgroup, getSetDbVgroupCase) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 7); - ASSERT_EQ(vgInfo.numOfEps, 2); + ASSERT_EQ(vgInfo.epset.numOfEps, 2); code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(pvgInfo->vgId, 8); - ASSERT_EQ(pvgInfo->numOfEps, 3); + ASSERT_EQ(pvgInfo->epset.numOfEps, 3); taosArrayDestroy(vgList); catalogDestroy(); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8e30c0d7111b873abe7142e6d650396767d55ed6..df7341dc35cec17d2623aa25b882b22b3379dbb3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5163,14 +5163,9 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); - SEpSet epSet = {0}; - epSet.numOfEps = pSource->addr.numOfEps; - epSet.port[0] = pSource->addr.epAddr[0].port; - tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); - int64_t startTs = taosGetTimestampUs(); qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epset.eps[0].fqdn, pSource->taskId, pExchangeInfo->current, totalSources); pMsg->header.vgId = htonl(pSource->addr.nodeId); pMsg->sId = htobe64(pSource->schedId); @@ -5192,7 +5187,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epset, &transporterId, pMsgSendInfo); tsem_wait(&pExchangeInfo->ready); SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp; diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 0a2f2b20f2c77170d04b2c9cf7705dd2a1129b32..a56a6524fca771bb527bcc8acec31e0ce524513f 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -3644,6 +3644,7 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf) return TSDB_CODE_SUCCESS; } +//TODO remove it int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgList) { SArray* vgroupList = NULL; int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList); @@ -3651,21 +3652,17 @@ int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgL return code; } - int32_t vgroupNum = taosArrayGetSize(vgroupList); + size_t vgroupNum = taosArrayGetSize(vgroupList); - SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupNum); - + SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum); vgList->numOfVgroups = vgroupNum; for (int32_t i = 0; i < vgroupNum; ++i) { SVgroupInfo *vg = taosArrayGet(vgroupList, i); - vgList->vgroups[i].vgId = vg->vgId; - vgList->vgroups[i].numOfEps = vg->numOfEps; - memcpy(vgList->vgroups[i].epAddr, vg->epAddr, sizeof(vgList->vgroups[i].epAddr)); + vgList->vgroups[i] = *vg; } *pVgList = vgList; - taosArrayDestroy(vgroupList); return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 06a9a3d16ea9155bc7b63574eb462e8dda203753..3ae89bca0a7f661d7c9074db6b957976aee6b0f3 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -58,13 +58,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out SVgroupInfo* info = taosArrayGet(array, 0); pShowReq->head.vgId = htonl(info->vgId); - pEpSet->numOfEps = info->numOfEps; - pEpSet->inUse = info->inUse; - - for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); - pEpSet->port[i] = info->epAddr[i].port; - } + *pEpSet = info->epset; *outputLen = sizeof(SVShowTablesReq); *output = pShowReq; diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index a87e138ed0f201ce098b3f2fc132d6cf4d0a63c1..71ee53dfea4b30bee5c1695d5855fa1f1b49cc4b 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1426,35 +1426,6 @@ bool isQueryWithLimit(SQueryStmtInfo* pQueryInfo) { return false; } -SVgroupsInfo* vgroupInfoClone(SVgroupsInfo *vgroupList) { - if (vgroupList == NULL) { - return NULL; - } - - size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupList->numOfVgroups; - SVgroupsInfo* pNew = malloc(size); - if (pNew == NULL) { - return NULL; - } - - pNew->numOfVgroups = vgroupList->numOfVgroups; - - for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { - SVgroupMsg* pNewVInfo = &pNew->vgroups[i]; - - SVgroupMsg* pvInfo = &vgroupList->vgroups[i]; - pNewVInfo->vgId = pvInfo->vgId; - pNewVInfo->numOfEps = pvInfo->numOfEps; - - for(int32_t j = 0; j < pvInfo->numOfEps; ++j) { - pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port; - tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN); - } - } - - return pNew; -} - void* vgroupInfoClear(SVgroupsInfo *vgroupList) { if (vgroupList == NULL) { return NULL; @@ -1505,19 +1476,6 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta) { return p; } -SVgroupsInfo* vgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) { - assert(pVgroupsInfo != NULL); - - size_t size = sizeof(SVgroupMsg) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo); - SVgroupsInfo* pInfo = calloc(1, size); - pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups; - for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) { - memcpy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m], sizeof(SVgroupMsg)); - } - - return pInfo; -} - int32_t getNumOfOutput(SFieldInfo* pFieldInfo) { return pFieldInfo->numOfOutput; } diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index e2b8766700ecaaca79267c0c0a102ad5a9383200..00d64bd12ac735c58efbd07290b693185f9ef3ef 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -15,6 +15,7 @@ #include "mockCatalogService.h" +#include "tep.h" #include #include #include @@ -39,7 +40,16 @@ public: virtual TableBuilder& setVgid(int16_t vgid) { schema()->vgId = vgid; - meta_->vgs.emplace_back(SVgroupInfo{.vgId = vgid, .hashBegin = 0, .hashEnd = 0, .inUse = 0, .numOfEps = 3, .epAddr = {{"dnode_1", 6030}, {"dnode_2", 6030}, {"dnode_3", 6030}}}); + + SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0, }; + + vgroup.epset.eps[0] = (SEp){"dnode_1", 6030}; + vgroup.epset.eps[1] = (SEp){"dnode_2", 6030}; + vgroup.epset.eps[2] = (SEp){"dnode_3", 6030}; + vgroup.epset.inUse = 0; + vgroup.epset.numOfEps = 3; + + meta_->vgs.emplace_back(vgroup); return *this; } @@ -112,9 +122,7 @@ public: int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { // todo vgInfo->vgId = 1; - vgInfo->numOfEps = 1; - vgInfo->epAddr[0].port = 6030; - strcpy(vgInfo->epAddr[0].fqdn, "node1"); + addEpIntoEpSet(&vgInfo->epset, "node1", 6030); return 0; } @@ -133,9 +141,16 @@ public: meta_[db][tbname].reset(new MockTableMeta()); meta_[db][tbname]->schema.reset(table.release()); meta_[db][tbname]->schema->uid = id_++; - meta_[db][tbname]->vgs.emplace_back((SVgroupInfo){.vgId = vgid, .hashBegin = 0, .hashEnd = 0, .inUse = 0, .numOfEps = 3, .epAddr = {{"dnode_1", 6030}, {"dnode_2", 6030}, {"dnode_3", 6030}}}); + + SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,}; + addEpIntoEpSet(&vgroup.epset, "dnode_1", 6030); + addEpIntoEpSet(&vgroup.epset, "dnode_2", 6030); + addEpIntoEpSet(&vgroup.epset, "dnode_3", 6030); + vgroup.epset.inUse = 0; + + meta_[db][tbname]->vgs.emplace_back(vgroup); // super table - meta_[db][stbname]->vgs.emplace_back((SVgroupInfo){.vgId = vgid, .hashBegin = 0, .hashEnd = 0, .inUse = 0, .numOfEps = 3, .epAddr = {{"dnode_1", 6030}, {"dnode_2", 6030}, {"dnode_3", 6030}}}); + meta_[db][stbname]->vgs.emplace_back(vgroup); } void showTables() const { diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 12c0d0780b677ebfc6457bdbf561dbba05cbc968..697504681f96147d51472b39fc79d1efaa21c211 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -251,24 +251,9 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { return subplan; } -static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) { - execNode->nodeId = vg->vgId; - execNode->inUse = vg->inUse; - execNode->numOfEps = vg->numOfEps; - for (int8_t i = 0; i < vg->numOfEps; ++i) { - execNode->epAddr[i] = vg->epAddr[i]; - } - return; -} - -static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) { - execNode->nodeId = vg->vgId; - execNode->inUse = 0; // todo - execNode->numOfEps = vg->numOfEps; - for (int8_t i = 0; i < vg->numOfEps; ++i) { - execNode->epAddr[i] = vg->epAddr[i]; - } - return; +static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) { + pNodeAddr->nodeId = vg->vgId; + pNodeAddr->epset = vg->epset; } static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) { @@ -277,7 +262,8 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); subplan->msgType = TDMT_VND_QUERY; - vgroupMsgToEpSet(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode); + + vgroupInfoToNodeAddr(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode); subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo); subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode); RECOVERY_CURRENT_SUBPLAN(pCxt); @@ -297,11 +283,12 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType); } -static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) { - vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode); - +// TODO: the SVgroupInfo index +static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) { + SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList; + vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode); int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan; - return createUserTableScanNode(pPlanNode, pTable, type); + return createUserTableScanNode(pPlanNode, pTableInfo, type); } static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { @@ -374,7 +361,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i); - vgroupInfoToEpSet(&blocks->vg, &subplan->execNode); + subplan->execNode.epset = blocks->vg.epset; subplan->pDataSink = createDataInserter(pCxt, blocks, NULL); subplan->pNode = NULL; subplan->type = QUERY_TYPE_MODIFY; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 4fbec534f6f9546a8985bc1c7b2da7ce728f8be3..8d58b262aabdbc67a8ef2224cbeead38394c0524 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -736,7 +736,7 @@ static const char* jkEpAddrFqdn = "Fqdn"; static const char* jkEpAddrPort = "Port"; static bool epAddrToJson(const void* obj, cJSON* json) { - const SEpAddr* ep = (const SEpAddr*)obj; + const SEp* ep = (const SEp*)obj; bool res = cJSON_AddStringToObject(json, jkEpAddrFqdn, ep->fqdn); if (res) { res = cJSON_AddNumberToObject(json, jkEpAddrPort, ep->port); @@ -745,7 +745,7 @@ static bool epAddrToJson(const void* obj, cJSON* json) { } static bool epAddrFromJson(const cJSON* json, void* obj) { - SEpAddr* ep = (SEpAddr*)obj; + SEp* ep = (SEp*)obj; copyString(json, jkEpAddrFqdn, ep->fqdn); ep->port = getNumber(json, jkEpAddrPort); return true; @@ -763,11 +763,11 @@ static bool queryNodeAddrToJson(const void* obj, cJSON* json) { bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId); if (res) { - res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->inUse); + res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->epset.inUse); } if (res) { - res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epAddr, sizeof(SEpAddr), pAddr->numOfEps); + res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epset.eps, sizeof(SEp), pAddr->epset.numOfEps); } return res; } @@ -776,11 +776,11 @@ static bool queryNodeAddrFromJson(const cJSON* json, void* obj) { SQueryNodeAddr* pAddr = (SQueryNodeAddr*) obj; pAddr->nodeId = getNumber(json, jkNodeAddrId); - pAddr->inUse = getNumber(json, jkNodeAddrInUse); + pAddr->epset.inUse = getNumber(json, jkNodeAddrInUse); int32_t numOfEps = 0; - bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epAddr, sizeof(SEpAddr), &numOfEps); - pAddr->numOfEps = numOfEps; + bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epset.eps, sizeof(SEp), &numOfEps); + pAddr->epset.numOfEps = numOfEps; return res; } diff --git a/source/libs/planner/test/phyPlanTests.cpp b/source/libs/planner/test/phyPlanTests.cpp index edf5fa5a811c3141b9cfb813041c0a5171bfce4b..f4bdf57572cbd97403d66146fb064b3a35179257 100644 --- a/source/libs/planner/test/phyPlanTests.cpp +++ b/source/libs/planner/test/phyPlanTests.cpp @@ -124,12 +124,10 @@ private: } void copyStorageMeta(SVgroupsInfo** dst, const std::vector& src) { - *dst = (SVgroupsInfo*)myCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * src.size()); + *dst = (SVgroupsInfo*)myCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * src.size()); (*dst)->numOfVgroups = src.size(); for (int32_t i = 0; i < src.size(); ++i) { - (*dst)->vgroups[i].vgId = src[i].vgId; - (*dst)->vgroups[i].numOfEps = src[i].numOfEps; - memcpy((*dst)->vgroups[i].epAddr, src[i].epAddr, src[i].numOfEps); + (*dst)->vgroups[i] = src[i]; } } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 52e632ffbb28c3a9f0b6febcbd9c69be3f0b4106..38ce005094207b351154e368898570987295824c 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -127,8 +127,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { pRsp->vgroupInfo[i].hashBegin = ntohl(pRsp->vgroupInfo[i].hashBegin); pRsp->vgroupInfo[i].hashEnd = ntohl(pRsp->vgroupInfo[i].hashEnd); - for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = ntohs(pRsp->vgroupInfo[i].epAddr[n].port); + for (int32_t n = 0; n < pRsp->vgroupInfo[i].epset.numOfEps; ++n) { + pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port); } if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 223fa300dfb43225cf4432eed342881d00146ed8..7a843765b60294ef2bc74bc837d2edc7dc5eedf8 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -417,13 +417,13 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (pTask->plan->execNode.numOfEps > 0) { + if (pTask->plan->execNode.epset.numOfEps > 0) { if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.numOfEps); + SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epset.numOfEps); return TSDB_CODE_SUCCESS; } @@ -446,7 +446,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } if (addNum <= 0) { - SCH_TASK_ELOG("no available execNode as candidate addr, nodeNum:%d", nodeNum); + SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum); return TSDB_CODE_QRY_INVALID_INPUT; } @@ -1050,31 +1050,19 @@ _return: SCH_RET(code); } -void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { - epSet->inUse = addr->inUse; - epSet->numOfEps = addr->numOfEps; - - for (int8_t i = 0; i < epSet->numOfEps; ++i) { - strncpy(epSet->fqdn[i], addr->epAddr[i].fqdn, sizeof(addr->epAddr[i].fqdn)); - epSet->port[i] = addr->epAddr[i].port; - } -} - int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; bool isCandidateAddr = false; - SEpSet epSet; - if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); isCandidateAddr = true; } - schConvertAddrToEpSet(addr, &epSet); - + SEpSet epSet = addr->epset; + switch (msgType) { case TDMT_VND_CREATE_TABLE: case TDMT_VND_SUBMIT: { @@ -1218,8 +1206,6 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen); SCH_ERR_JRET(code); } - -// printf("physical plan:%s\n", pTask->msg); } SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); @@ -1300,7 +1286,7 @@ void schDropJobAllTasks(SSchJob *pJob) { int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) { qDebug("QID:0x%"PRIx64" job started", pDag->queryId); - if (pNodeList && taosArrayGetSize(pNodeList) <= 0) { + if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) { qDebug("QID:0x%"PRIx64" input exec nodeList is empty", pDag->queryId); } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 58714c51ccfe64317980e3e1885df14661b67cdb..6e44d99c15f4bacfbe3515f049525e9d87ab50d4 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -29,7 +29,6 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" -#pragma GCC diagnostic ignored "-Wliteral-suffix" #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" @@ -92,11 +91,11 @@ void schtBuildQueryDag(SQueryDag *dag) { scanPlan->id.templateId = 0x0000000000000002; scanPlan->id.subplanId = 0x0000000000000003; scanPlan->type = QUERY_TYPE_SCAN; - scanPlan->execNode.numOfEps = 1; + scanPlan->execNode.nodeId = 1; - scanPlan->execNode.inUse = 0; - scanPlan->execNode.epAddr[0].port = 6030; - strcpy(scanPlan->execNode.epAddr[0].fqdn, "ep0"); + scanPlan->execNode.epset.inUse = 0; + addEpIntoEpSet(&scanPlan->execNode.epset, "ep0", 6030); + scanPlan->pChildren = NULL; scanPlan->level = 1; scanPlan->pParents = taosArrayInit(1, POINTER_BYTES); @@ -108,7 +107,8 @@ void schtBuildQueryDag(SQueryDag *dag) { mergePlan->id.subplanId = 0x5555555555; mergePlan->type = QUERY_TYPE_MERGE; mergePlan->level = 0; - mergePlan->execNode.numOfEps = 0; + mergePlan->execNode.epset.numOfEps = 0; + mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES); mergePlan->pParents = NULL; mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); @@ -144,11 +144,11 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[0].id.subplanId = 0x0000000000000004; insertPlan[0].type = QUERY_TYPE_MODIFY; insertPlan[0].level = 0; - insertPlan[0].execNode.numOfEps = 1; + insertPlan[0].execNode.nodeId = 1; - insertPlan[0].execNode.inUse = 0; - insertPlan[0].execNode.epAddr[0].port = 6030; - strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0"); + insertPlan[0].execNode.epset.inUse = 0; + addEpIntoEpSet(&insertPlan[0].execNode.epset, "ep0", 6030); + insertPlan[0].pChildren = NULL; insertPlan[0].pParents = NULL; insertPlan[0].pNode = NULL; @@ -160,11 +160,11 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[1].id.subplanId = 0x0000000000000005; insertPlan[1].type = QUERY_TYPE_MODIFY; insertPlan[1].level = 0; - insertPlan[1].execNode.numOfEps = 1; + insertPlan[1].execNode.nodeId = 1; - insertPlan[1].execNode.inUse = 1; - insertPlan[1].execNode.epAddr[0].port = 6030; - strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1"); + insertPlan[1].execNode.epset.inUse = 0; + addEpIntoEpSet(&insertPlan[1].execNode.epset, "ep0", 6030); + insertPlan[1].pChildren = NULL; insertPlan[1].pParents = NULL; insertPlan[1].pNode = NULL; @@ -371,9 +371,9 @@ void* schtRunJobThread(void *aa) { while (!schtTestStop) { schtBuildQueryDag(&dag); - SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); - SEpAddr qnodeAddr = {0}; + SEp qnodeAddr = {0}; strcpy(qnodeAddr.fqdn, "qnode0.ep"); qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); @@ -523,9 +523,9 @@ TEST(queryTest, normalCase) { schtInitLogFile(); - SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); - SEpAddr qnodeAddr = {0}; + SEp qnodeAddr = {0}; strcpy(qnodeAddr.fqdn, "qnode0.ep"); qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); @@ -627,9 +627,9 @@ TEST(insertTest, normalCase) { schtInitLogFile(); - SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); - SEpAddr qnodeAddr = {0}; + SEp qnodeAddr = {0}; strcpy(qnodeAddr.fqdn, "qnode0.ep"); qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index b0927e9ecfbb61da5508fb4e1151621e600897e4..a286482fc14cb8845fa1bc253851de7c0162e6f4 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -814,9 +814,9 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { SEpSet * pEpSet = &pContext->epSet; pConn = - rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); + rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType); if (pConn == NULL || pConn->user[0] == 0) { - pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); + pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType); } if (pConn) { @@ -1188,7 +1188,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte // for UDP, port may be changed by server, the port in epSet shall be used for cache if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.eps[pContext->epSet.inUse].port, pConn->connType); } else { rpcCloseConn(pConn); @@ -1202,9 +1202,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps, pContext->epSet.inUse); for (int i = 0; i < pContext->epSet.numOfEps; ++i) { - pContext->epSet.port[i] = htons(pContext->epSet.port[i]); - tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i], - pContext->epSet.port[i]); + pContext->epSet.eps[i].port = htons(pContext->epSet.eps[i].port); + tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.eps[i].fqdn, + pContext->epSet.eps[i].port); } } rpcSendReqToServer(pRpc, pContext); diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 84814f39fc07d9a22635cd534748c433f2458b7a..4c1fd0c32792b44171373c43592eac761cc78458 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "os.h" #include "rpcLog.h" #include "taoserror.h" @@ -86,12 +87,9 @@ int main(int argc, char *argv[]) { pthread_attr_t thattr; // server info - epSet.numOfEps = 1; epSet.inUse = 0; - epSet.port[0] = 7000; - epSet.port[1] = 7000; - strcpy(epSet.fqdn[0], serverIp); - strcpy(epSet.fqdn[1], "192.168.0.1"); + addEpIntoEpSet(&epSet, serverIp, 7000); + addEpIntoEpSet(&epSet, "192.168.0.1", 7000); // client info memset(&rpcInit, 0, sizeof(rpcInit)); @@ -109,9 +107,9 @@ int main(int argc, char *argv[]) { for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { - epSet.port[0] = atoi(argv[++i]); + epSet.eps[0].port = atoi(argv[++i]); } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) { - tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0])); + tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn)); } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) { rpcInit.numOfThreads = atoi(argv[++i]); } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) { @@ -135,7 +133,7 @@ int main(int argc, char *argv[]) { } else { printf("\nusage: %s [options] \n", argv[0]); printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); - printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]); + printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port); printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions); printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);