From 67fd99e75629bd82bfef62eb5f4f71f93c1629f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 11 Apr 2021 23:04:48 +0800 Subject: [PATCH] [td-225] update the local cached epset in sqlObj. --- src/client/src/tscSchemaUtil.c | 3 ++- src/client/src/tscServer.c | 43 +++++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 67352ca71c..2ea382132b 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -144,8 +144,9 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { SNewVgroupInfo info = {0}; info.numOfEps = pVgroupMsg->numOfEps; info.vgId = pVgroupMsg->vgId; - info.inUse = 0; + info.inUse = 0; // 0 is the default value of inUse in case of multiple replica + assert(info.numOfEps >= 1 && info.vgId >= 1); for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) { tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN); info.ep[i].port = pVgroupMsg->epAddr[i].port; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cfdf2d75dd..8889e25177 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -113,8 +113,8 @@ static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgrou } } -static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { - SSqlCmd *pCmd = &pObj->cmd; +static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { + SSqlCmd *pCmd = &pSql->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return; @@ -143,6 +143,12 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); + + // Update the local cached epSet info cached by SqlObj + int32_t inUse = pSql->epSet.inUse; + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); + tscDebug("%p update the epSet in SqlObj, in use before:%d, after:%d", pSql, inUse, pSql->epSet.inUse); + } int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo) { @@ -2007,7 +2013,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { (vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup); taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); - tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); + tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); } } @@ -2159,18 +2165,33 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { tscError("%p empty vgroup info", pSql); } else { for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { - //just init, no need to lock - SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; + // just init, no need to lock + SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j]; SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; - pVgroups->vgId = htonl(vmsg->vgId); - pVgroups->numOfEps = vmsg->numOfEps; + vmsg->vgId = htonl(vmsg->vgId); + vmsg->numOfEps = vmsg->numOfEps; + for (int32_t k = 0; k < vmsg->numOfEps; ++k) { + vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port); + } + + SNewVgroupInfo newVi = createNewVgroupInfo(vmsg); + pVgroup->numOfEps = newVi.numOfEps; + pVgroup->vgId = newVi.vgId; + for (int32_t k = 0; k < vmsg->numOfEps; ++k) { + pVgroup->epAddr[k].port = newVi.ep[k].port; + pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN); + } - assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1); + // check if current buffer contains the vgroup info. + // If not, add it + SNewVgroupInfo existVgroupInfo = {.inUse = -1}; + taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo)); - for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { - pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port); - pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn)); + if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) || + (existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it + taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi)); + tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap)); } } } -- GitLab