diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 934a562387f26a95ec1508c5cfbd4633137cac9e..b391a5760ef4e9914b5a94fe0729a109f1342c63 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -157,6 +157,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; + //init version here + pTableMeta->vgroupInfo.version = 0; + pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->tversion = pTableMetaMsg->tversion; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e8a03a64828853ee0508c881349408abe7c66faa..4a11288a857221abec171db782926a18f0e55044 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -51,8 +51,7 @@ static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) { } bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { - if (s1->numOfIps != s2->numOfIps - || s1->inUse != s1->inUse) { + if (s1->numOfIps != s2->numOfIps /*|| s1->inUse != s1->inUse*/) { return false; } for (int32_t i = 0; i < s1->numOfIps; i++) { @@ -80,21 +79,38 @@ void tscSetMgmtIpList(SRpcIpSet *pIpSet) { } taosCorEndWrite(&tscMgmtIpSet.version); } -static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { - SRpcIpSet* pIpList = &pSql->ipList; - pIpList->inUse = 0; - if (pVgroupInfo == NULL) { - pIpList->numOfIps = 0; - return; +static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet, int32_t *vgId) { + if (pVgroupInfo == NULL) { return;} + taosCorBeginRead(&pVgroupInfo->version); + if (vgId) { + *vgId = pVgroupInfo->vgId; } - - pIpList->numOfIps = pVgroupInfo->numOfIps; - for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { - strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn); - pIpList->port[i] = pVgroupInfo->ipAddr[i].port; + pIpSet->inUse = 0; + pIpSet->numOfIps = pVgroupInfo->numOfIps; + for (int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { + strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); + pIpSet->port[i] = pVgroupInfo->ipAddr[i].port; } + taosCorEndRead(&pVgroupInfo->version); +} +static void tscSetVgroupInfoWithIpSet(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) { + taosCorBeginWrite(&pVgroupInfo->version); + //TODO(dengyihao), dont care vgid + pVgroupInfo->numOfIps = pIpSet->numOfIps; + for (int32_t i = 0; pVgroupInfo->numOfIps; i++) { + strncpy(pVgroupInfo->ipAddr[i].fqdn, pIpSet->fqdn[i], TSDB_FQDN_LEN); + pVgroupInfo->ipAddr[i].port = pIpSet->port[i]; + } + taosCorEndWrite(&pVgroupInfo->version); +} +static void tscSetVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { + if (tscIpSetIsEqual(&pObj->ipList, pIpSet)) { + return; + } + SSqlCmd *pCmd = &pObj->cmd; + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + tscSetVgroupInfoWithIpSet(&pTableMetaInfo->pTableMeta->vgroupInfo, pIpSet); } - void tscPrintMgmtIp() { SRpcIpSet dump; tscDumpMgmtIpSet(&dump); @@ -263,7 +279,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { } if (pCmd->command < TSDB_SQL_MGMT) { - if (pIpSet) pSql->ipList = *pIpSet; + if (pIpSet) tscSetVgroupInfo(pSql, pIpSet); } else { if (pIpSet) tscSetMgmtIpList(pIpSet); } @@ -553,7 +569,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); + tscDumpIpSetFromVgroupInfo(&pTableMeta->vgroupInfo, &pSql->ipList, NULL); tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, pSql->ipList.numOfIps); @@ -595,11 +611,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - - tscSetDnodeIpList(pSql, pVgroupInfo); - if (pVgroupInfo != NULL) { - pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); - } + int32_t vgId = 0; + tscDumpIpSetFromVgroupInfo(pVgroupInfo, &pSql->ipList, &vgId); + pQueryMsg->head.vgId = htonl(vgId); STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->sid); @@ -618,8 +632,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info - tscSetDnodeIpList(pSql, &pTableIdList->vgInfo); - pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); + int32_t vgId = 0; + tscDumpIpSetFromVgroupInfo(&pTableIdList->vgInfo, &pSql->ipList, &vgId); + pQueryMsg->head.vgId = htonl(vgId); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables @@ -1351,7 +1366,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); + tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->vgroupInfo, &pSql->ipList, NULL); return TSDB_CODE_SUCCESS; } @@ -1875,11 +1890,11 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { memcpy(pInfo->vgroupList, pVgroupInfo, size); for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { + //just init, no need to lock SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; - + pVgroups->version = 0; pVgroups->vgId = htonl(pVgroups->vgId); assert(pVgroups->numOfIps >= 1); - for (int32_t k = 0; k < pVgroups->numOfIps; ++k) { pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port); } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b7afaf1e065074018d9725796434776dae1a9158..c12aed2bccb2c620c696b370406a9b4ba5405d76 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -647,6 +647,7 @@ typedef struct SCMSTableVgroupMsg { } SCMSTableVgroupMsg, SCMSTableVgroupRspMsg; typedef struct { + int32_t version; int32_t vgId; int8_t numOfIps; SIpAddr ipAddr[TSDB_MAX_REPLICA];