diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 67352ca71cab03bb9cd2f6b920b97abebbe1420a..2ea382132b8de9a97bd75d1e80b03a6eb6131756 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -144,8 +144,9 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { SNewVgroupInfo info = {0}; info.numOfEps = pVgroupMsg->numOfEps; info.vgId = pVgroupMsg->vgId; - info.inUse = 0; + info.inUse = 0; // 0 is the default value of inUse in case of multiple replica + assert(info.numOfEps >= 1 && info.vgId >= 1); for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) { tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN); info.ep[i].port = pVgroupMsg->epAddr[i].port; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index f1c1d2da719d63973953382734489aeca1e58258..8889e25177fe23668c67bbba363e0efa06a017d2 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -34,6 +34,7 @@ int tscKeepConn[TSDB_SQL_MAX] = {0}; TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt); void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); void tscSaveSubscriptionProgress(void* sub); +static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t getWaitingTimeInterval(int32_t count) { @@ -78,7 +79,8 @@ static void tscEpSetHtons(SRpcEpSet *s) { for (int32_t i = 0; i < s->numOfEps; i++) { s->port[i] = htons(s->port[i]); } -} +} + bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { return false; @@ -111,19 +113,22 @@ static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgrou } } -static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { - SSqlCmd *pCmd = &pObj->cmd; +static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { + SSqlCmd *pCmd = &pSql->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return; } - int32_t vgId = pTableMetaInfo->pTableMeta->vgId; + int32_t vgId = -1; if (pTableMetaInfo->pTableMeta->tableType == TSDB_SUPER_TABLE) { - assert(vgId == 0); - return; + vgId = extractSTableQueryVgroupId(pTableMetaInfo); + } else { + vgId = pTableMetaInfo->pTableMeta->vgId; } + assert(vgId > 0); + SNewVgroupInfo vgroupInfo = {.vgId = -1}; taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0); @@ -138,6 +143,33 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); + + // Update the local cached epSet info cached by SqlObj + int32_t inUse = pSql->epSet.inUse; + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); + tscDebug("%p update the epSet in SqlObj, in use before:%d, after:%d", pSql, inUse, pSql->epSet.inUse); + +} + +int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo) { + assert(pTableMetaInfo != NULL); + + int32_t vgIndex = pTableMetaInfo->vgroupIndex; + int32_t vgId = -1; + + if (pTableMetaInfo->pVgroupTables == NULL) { + SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; + assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); + vgId = pVgroupInfo->vgroups[vgIndex].vgId; + } else { + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + assert(vgIndex >= 0 && vgIndex < numOfVgroups); + + SVgroupTableInfo *pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); + vgId = pTableIdList->vgInfo.vgId; + } + + return vgId; } void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { @@ -515,21 +547,22 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { int32_t vgIndex = pTableMetaInfo->vgroupIndex; + int32_t vgId = -1; + if (pTableMetaInfo->pVgroupTables == NULL) { SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); - - pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); - tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qId); + vgId = pVgroupInfo->vgroups[vgIndex].vgId; } else { int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); assert(vgIndex >= 0 && vgIndex < numOfVgroups); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); - - pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); - tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qId); + vgId = pTableIdList->vgInfo.vgId; } + + pRetrieveMsg->header.vgId = htonl(vgId); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, vgId, vgIndex, pSql->res.qId); } else { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); @@ -1980,7 +2013,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { (vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup); taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); - tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); + tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); } } @@ -2132,18 +2165,33 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { tscError("%p empty vgroup info", pSql); } else { for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { - //just init, no need to lock - SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; + // just init, no need to lock + SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j]; SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; - pVgroups->vgId = htonl(vmsg->vgId); - pVgroups->numOfEps = vmsg->numOfEps; + vmsg->vgId = htonl(vmsg->vgId); + vmsg->numOfEps = vmsg->numOfEps; + for (int32_t k = 0; k < vmsg->numOfEps; ++k) { + vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port); + } + + SNewVgroupInfo newVi = createNewVgroupInfo(vmsg); + pVgroup->numOfEps = newVi.numOfEps; + pVgroup->vgId = newVi.vgId; + for (int32_t k = 0; k < vmsg->numOfEps; ++k) { + pVgroup->epAddr[k].port = newVi.ep[k].port; + pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN); + } - assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1); + // check if current buffer contains the vgroup info. + // If not, add it + SNewVgroupInfo existVgroupInfo = {.inUse = -1}; + taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo)); - for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { - pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port); - pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn)); + if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) || + (existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it + taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi)); + tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap)); } } } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 133ae6d0abbc77e141c6aef7835428b841c4988e..7205bafd7d01ede8f11233e7554b7082ae3d53c7 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -295,7 +295,7 @@ void *rpcOpen(const SRpcInit *pInit) { return NULL; } } else { - pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime); + pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime*30); if ( pRpc->pCache == NULL ) { tError("%s failed to init connection cache", pRpc->label); rpcClose(pRpc); @@ -470,7 +470,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { taosTmrStopA(&pConn->pTimer); // set the idle timer to monitor the activity - taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime*30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured @@ -1367,7 +1367,7 @@ static void rpcProcessConnError(void *param, void *id) { tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); - if (pContext->numOfTry >= pContext->epSet.numOfEps) { + if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) { rpcMsg.msgType = pContext->msgType+1; rpcMsg.ahandle = pContext->ahandle; rpcMsg.code = pContext->code;