diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ef8184990df0d4cb23b928ba1acb4bf9f44e23e5..b5455ed1fb3c1c81a662376b0f8308a1a64c46ba 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -52,12 +52,20 @@ typedef struct STableComInfo { int32_t rowSize; } STableComInfo; +typedef struct SCMCorVgroupInfo { + int32_t version; + int8_t inUse; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; +} SCMCorVgroupInfo; + typedef struct STableMeta { STableComInfo tableInfo; uint8_t tableType; int16_t sversion; int16_t tversion; - SCMVgroupInfo vgroupInfo; + SCMVgroupInfo vgroupInfo; + SCMCorVgroupInfo corVgroupInfo; int32_t sid; // the index of one table in a virtual node uint64_t uid; // unique id of a table SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info @@ -456,7 +464,8 @@ extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; extern int tscNumOfThreads; -extern SRpcEpSet tscMgmtEpSet; + +extern SRpcCorEpSet tscMgmtEpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 934a562387f26a95ec1508c5cfbd4633137cac9e..9b8f48b109eb773a4808ec95231e175d613ec6c0 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -140,7 +140,15 @@ struct SSchema tscGetTbnameColumnSchema() { strcpy(s.name, TSQL_TBNAME_L); return s; } - +static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) { + corVgroupInfo->version = 0; + corVgroupInfo->inUse = 0; + corVgroupInfo->numOfEps = vgroupInfo->numOfEps; + for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) { + strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port; + } +} STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { assert(pTableMetaMsg != NULL); @@ -157,6 +165,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; + + tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo); + pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->tversion = pTableMetaMsg->tversion; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 398dea9d097da089a48a613ca3aeba194c38abb5..d9922b8718b832de9f37073f27d44a9502e5da6c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -26,10 +26,11 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" +#include "tlockfree.h" #define TSC_MGMT_VNODE 999 -SRpcEpSet tscMgmtEpSet; +SRpcCorEpSet tscMgmtEpSet; SRpcEpSet tscDnodeEpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -51,37 +52,78 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { pEpSet->numOfEps = 0; return; } - + pEpSet->numOfEps = pVgroupInfo->numOfEps; for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn); pEpSet->port[i] = pVgroupInfo->epAddr[i].port; } } - -void tscPrintMgmtEp() { - if (tscMgmtEpSet.numOfEps <= 0) { - tscError("invalid mnode EP list:%d", tscMgmtEpSet.numOfEps); - } else { - for (int i = 0; i < tscMgmtEpSet.numOfEps; ++i) { - tscDebug("mnode index:%d %s:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]); - } +static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { + taosCorBeginRead(&tscMgmtEpSet.version); + *epSet = tscMgmtEpSet.epSet; + taosCorEndRead(&tscMgmtEpSet.version); +} +static void tscEpSetHtons(SRpcEpSet *s) { + for (int32_t i = 0; i < s->numOfEps; i++) { + s->port[i] = htons(s->port[i]); + } +} +bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { + if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { + return false; + } + for (int32_t i = 0; i < s1->numOfEps; i++) { + if (s1->port[i] != s2->port[i] + || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0) + return false; + } + return true; +} +void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) { + // no need to update if equal + taosCorBeginWrite(&tscMgmtEpSet.version); + tscMgmtEpSet.epSet = *pEpSet; + taosCorEndWrite(&tscMgmtEpSet.version); +} +static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { + if (pVgroupInfo == NULL) { return;} + taosCorBeginRead(&pVgroupInfo->version); + int8_t inUse = pVgroupInfo->inUse; + pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; + pEpSet->numOfEps = pVgroupInfo->numOfEps; + for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { + strncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + pEpSet->port[i] = pVgroupInfo->epAddr[i].port; } + taosCorEndRead(&pVgroupInfo->version); } -void tscSetMgmtEpSet(SRpcEpSet *pEpSet) { - tscMgmtEpSet.numOfEps = pEpSet->numOfEps; - tscMgmtEpSet.inUse = pEpSet->inUse; - for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) { - tscMgmtEpSet.port[i] = htons(pEpSet->port[i]); +static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { + SSqlCmd *pCmd = &pObj->cmd; + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;} + SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo; + + taosCorBeginWrite(&pVgroupInfo->version); + //TODO(dengyihao), dont care vgid + pVgroupInfo->inUse = pEpSet->inUse; + pVgroupInfo->numOfEps = pEpSet->numOfEps; + for (int32_t i = 0; pVgroupInfo->numOfEps; i++) { + strncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN); + pVgroupInfo->epAddr[i].port = pEpSet->port[i]; } + taosCorEndWrite(&pVgroupInfo->version); } - -void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) { - tscMgmtEpSet = *pEpSet; - tscDebug("mnode EP list is changed for ufp is called, numOfEps:%d inUse:%d", tscMgmtEpSet.numOfEps, tscMgmtEpSet.inUse); - for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) { - tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]); +void tscPrintMgmtEp() { + SRpcEpSet dump; + tscDumpMgmtEpSet(&dump); + if (dump.numOfEps <= 0) { + tscError("invalid mnode EP list:%d", dump.numOfEps); + } else { + for (int i = 0; i < dump.numOfEps; ++i) { + tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]); + } } } @@ -95,7 +137,9 @@ void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) { UNUSED_FUNC static int32_t tscGetMgmtConnMaxRetryTimes() { int32_t factor = 2; - return tscMgmtEpSet.numOfEps * factor; + SRpcEpSet dump; + tscDumpMgmtEpSet(&dump); + return dump.numOfEps * factor; } void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { @@ -111,9 +155,11 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (code == 0) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; - SRpcEpSet * pEpSet = &pRsp->epSet; - if (pEpSet->numOfEps > 0) - tscSetMgmtEpSet(pEpSet); + SRpcEpSet * epSet = &pRsp->epSet; + if (epSet->numOfEps > 0) { + tscEpSetHtons(epSet); + tscUpdateMgmtEpSet(epSet); + } pSql->pTscObj->connId = htonl(pRsp->connId); @@ -185,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - pSql->epSet = tscMgmtEpSet; + tscDumpMgmtEpSet(&pSql->epSet); } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -236,10 +282,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } - if (pCmd->command < TSDB_SQL_MGMT) { - if (pEpSet) pSql->epSet = *pEpSet; - } else { - if (pEpSet) tscMgmtEpSet = *pEpSet; + if (pEpSet) { + //SRpcEpSet dump; + tscEpSetHtons(pEpSet); + if (tscEpSetIsEqual(&pSql->epSet, pEpSet)) { + if(pCmd->command < TSDB_SQL_MGMT) { + tscUpdateVgroupInfo(pSql, pEpSet); + } else { + tscUpdateMgmtEpSet(pEpSet); + } + } } if (rpcMsg->pCont == NULL) { @@ -421,7 +473,8 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command < TSDB_SQL_LOCAL) { - pSql->epSet = tscMgmtEpSet; + + //pSql->epSet = tscMgmtEpSet; } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -525,8 +578,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscSetDnodeEpSet(pSql, &pTableMeta->vgroupInfo); - + tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet); + tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, pSql->epSet.numOfEps); return TSDB_CODE_SUCCESS; @@ -567,11 +620,11 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - tscSetDnodeEpSet(pSql, pVgroupInfo); + if (pVgroupInfo != NULL) { pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); - } + } STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->sid); @@ -588,8 +641,8 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); - - // set the vgroup info + + // set the vgroup info tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); @@ -1136,11 +1189,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShowMsg->payloadLen = htons(pPattern->n); } } else { - SSQLToken *pIpAddr = &pShowInfo->prefix; - assert(pIpAddr->n > 0 && pIpAddr->type > 0); + SSQLToken *pEpAddr = &pShowInfo->prefix; + assert(pEpAddr->n > 0 && pEpAddr->type > 0); - strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n); - pShowMsg->payloadLen = htons(pIpAddr->n); + strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n); + pShowMsg->payloadLen = htons(pEpAddr->n); } pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen; @@ -1323,7 +1376,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscSetDnodeEpSet(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); + tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet); return TSDB_CODE_SUCCESS; } @@ -1847,13 +1900,14 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { memcpy(pInfo->vgroupList, pVgroupInfo, size); for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { + //just init, no need to lock SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; - pVgroups->vgId = htonl(pVgroups->vgId); assert(pVgroups->numOfEps >= 1); for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port); + } pMsg += size; @@ -1946,8 +2000,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db)); - if (pConnect->epSet.numOfEps > 0) - tscSetMgmtEpSet(&pConnect->epSet); + if (pConnect->epSet.numOfEps > 0) { + tscEpSetHtons(&pConnect->epSet); + tscUpdateMgmtEpSet(&pConnect->epSet); + } strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 6a14d3a65e33b4f5edf8feafcbb909723bf8bdb2..5848b7b82f086a7fdc77a93fdf8a2c64b8c39696 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -63,7 +63,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con if (ip) { if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; - if (port) tscMgmtEpSet.port[0] = port; + if (port) tscMgmtEpSet.epSet.port[0] = port; } void *pDnodeConn = NULL; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 7722acf0d07bdddc994787f8343fff7309053086..5662b4a8856e638ebb41c0e21b5e23647494224c 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -41,7 +41,7 @@ int tscNumOfThreads; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); -void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet); +//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet); void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { taosGetDisk(); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4e6133663eb0d7d2ae7908eb2622a665e6010008..957bdeeb7f0e329ca1cad49e17daf031d0850ee6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2146,16 +2146,19 @@ char* strdup_throw(const char* str) { } int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { - tscMgmtEpSet.numOfEps = 0; - tscMgmtEpSet.inUse = 0; + // init mgmt ip set + tscMgmtEpSet.version = 0; + SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet); + mgmtEpSet->numOfEps = 0; + mgmtEpSet->inUse = 0; if (first && first[0] != 0) { if (strlen(first) >= TSDB_EP_LEN) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(first, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]); - tscMgmtEpSet.numOfEps++; + taosGetFqdnPortFromEp(first, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps])); + mgmtEpSet->numOfEps++; } if (second && second[0] != 0) { @@ -2163,11 +2166,11 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(second, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]); - tscMgmtEpSet.numOfEps++; + taosGetFqdnPortFromEp(second, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps])); + mgmtEpSet->numOfEps++; } - if ( tscMgmtEpSet.numOfEps == 0) { + if (mgmtEpSet->numOfEps == 0) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 1af6a5eb0f83cfefb949a6db91194ec8b002b944..bdee917b5e8203743a79ca27d8fbc987569c35ab 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -35,6 +35,11 @@ typedef struct SRpcEpSet { char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SRpcEpSet; +typedef struct SRpcCorEpSet { + int32_t version; + SRpcEpSet epSet; +} SRpcCorEpSet; + typedef struct SRpcConnInfo { uint32_t clientIp; uint16_t clientPort; diff --git a/src/util/inc/tlockfree.h b/src/util/inc/tlockfree.h index e425d71d270bd53a891e3585e12a2b762e8a81f6..a81f597832350c0d529670970d7eb80405321629 100644 --- a/src/util/inc/tlockfree.h +++ b/src/util/inc/tlockfree.h @@ -75,7 +75,7 @@ void taosRUnLockLatch(SRWLatch *pLatch); // copy on read #define taosCorBeginRead(x) for (uint32_t i_ = 1; 1; ++i_) { \ - int32_t old_ = atomic_load_32(x); \ + int32_t old_ = atomic_add_fetch_32((x), 0); \ if (old_ & 0x00000001) { \ if (i_ % 1000 == 0) { \ sched_yield(); \ @@ -84,7 +84,7 @@ void taosRUnLockLatch(SRWLatch *pLatch); } #define taosCorEndRead(x) \ - if (atomic_load_32(x) == old_) { \ + if (atomic_add_fetch_32((x), 0) == old_) { \ break; \ } \ }