diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e1180c414345da3e668d8eaece895c3c011f2004..3a420567a978d0adf6b97a0feaf16ac73ec8405f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -52,12 +52,20 @@ typedef struct STableComInfo { int32_t rowSize; } STableComInfo; +typedef struct SCMCorVgroupInfo { + int32_t version; + int8_t inUse; + int8_t numOfIps; + SIpAddr ipAddr[TSDB_MAX_REPLICA]; +} SCMCorVgroupInfo; + typedef struct STableMeta { STableComInfo tableInfo; uint8_t tableType; int16_t sversion; int16_t tversion; - SCMVgroupInfo vgroupInfo; + SCMVgroupInfo vgroupInfo; + SCMCorVgroupInfo corVgroupInfo; int32_t sid; // the index of one table in a virtual node uint64_t uid; // unique id of a table SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index b391a5760ef4e9914b5a94fe0729a109f1342c63..52342b36500d0eba6b1145cdf1f305d54ce40010 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -140,7 +140,15 @@ struct SSchema tscGetTbnameColumnSchema() { strcpy(s.name, TSQL_TBNAME_L); return s; } - +static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) { + corVgroupInfo->version = 0; + corVgroupInfo->inUse = 0; + corVgroupInfo->numOfIps = vgroupInfo->numOfIps; + for (int32_t i = 0; i < corVgroupInfo->numOfIps; i++) { + strncpy(corVgroupInfo->ipAddr[i].fqdn, vgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); + corVgroupInfo->ipAddr[i].port = vgroupInfo->ipAddr[i].port; + } +} STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { assert(pTableMetaMsg != NULL); @@ -157,9 +165,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; - //init version here - pTableMeta->vgroupInfo.version = 0; - + + tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo); + pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->tversion = pTableMetaMsg->tversion; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 7269173c8c6ce23300a22327979f78b79317b01f..cb97307b0462e70ae6731fc543be19a7067877ea 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -45,6 +45,20 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } +static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { + SRpcIpSet* pIpList = &pSql->ipList; + pIpList->inUse = 0; + if (pVgroupInfo == NULL) { + pIpList->numOfIps = 0; + return; + } + + pIpList->numOfIps = pVgroupInfo->numOfIps; + for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { + strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn); + pIpList->port[i] = pVgroupInfo->ipAddr[i].port; + } +} void tscIpSetCopy(SRpcIpSet *dst, SRpcIpSet *src) { dst->numOfIps = src->numOfIps; dst->inUse = src->inUse; @@ -83,12 +97,9 @@ void tscUpdateMgmtIpList(SRpcIpSet *pIpSet) { tscIpSetCopy(mgmtIpSet, pIpSet); taosCorEndWrite(&tscMgmtIpSet.version); } -static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet, int32_t *vgId) { +static void tscDumpIpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) { if (pVgroupInfo == NULL) { return;} - //taosCorBeginRead(&pVgroupInfo->version); - if (vgId) { - *vgId = pVgroupInfo->vgId; - } + taosCorBeginRead(&pVgroupInfo->version); int8_t inUse = pVgroupInfo->inUse; pIpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; pIpSet->numOfIps = pVgroupInfo->numOfIps; @@ -96,16 +107,16 @@ static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pI strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); pIpSet->port[i] = pVgroupInfo->ipAddr[i].port; } - //taosCorEndRead(&pVgroupInfo->version); + taosCorEndRead(&pVgroupInfo->version); } static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { SSqlCmd *pCmd = &pObj->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;} - SCMVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->vgroupInfo; + SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo; - //taosCorBeginWrite(&pVgroupInfo->version); + taosCorBeginWrite(&pVgroupInfo->version); //TODO(dengyihao), dont care vgid pVgroupInfo->inUse = pIpSet->inUse; pVgroupInfo->numOfIps = pIpSet->numOfIps; @@ -113,7 +124,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { strncpy(pVgroupInfo->ipAddr[i].fqdn, pIpSet->fqdn[i], TSDB_FQDN_LEN); pVgroupInfo->ipAddr[i].port = pIpSet->port[i]; } - //taosCorEndWrite(&pVgroupInfo->version); + taosCorEndWrite(&pVgroupInfo->version); } void tscPrintMgmtIp() { SRpcIpSet dump; @@ -577,7 +588,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscDumpIpSetFromVgroupInfo(&pTableMeta->vgroupInfo, &pSql->ipList, NULL); + tscDumpIpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->ipList); tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, pSql->ipList.numOfIps); @@ -619,9 +630,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - int32_t vgId = 0; - tscDumpIpSetFromVgroupInfo(pVgroupInfo, &pSql->ipList, &vgId); - pQueryMsg->head.vgId = htonl(vgId); + tscSetDnodeIpList(pSql, pVgroupInfo); + if (pVgroupInfo != NULL) { + pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); + } STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->sid); @@ -639,10 +651,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); - // set the vgroup info - int32_t vgId = 0; - tscDumpIpSetFromVgroupInfo(&pTableIdList->vgInfo, &pSql->ipList, &vgId); - pQueryMsg->head.vgId = htonl(vgId); + // set the vgroup info + tscSetDnodeIpList(pSql, &pTableIdList->vgInfo); + pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables @@ -1374,7 +1385,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->vgroupInfo, &pSql->ipList, NULL); + tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->ipList); return TSDB_CODE_SUCCESS; } @@ -1900,8 +1911,6 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { //just init, no need to lock SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; - pVgroups->version = 0; - pVgroups->inUse = 0; pVgroups->vgId = htonl(pVgroups->vgId); assert(pVgroups->numOfIps >= 1); for (int32_t k = 0; k < pVgroups->numOfIps; ++k) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index fec3d77d8e51628b159ac54c1331c3e6e138f83f..b7afaf1e065074018d9725796434776dae1a9158 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -647,9 +647,7 @@ typedef struct SCMSTableVgroupMsg { } SCMSTableVgroupMsg, SCMSTableVgroupRspMsg; typedef struct { - int32_t version; int32_t vgId; - int8_t inUse; int8_t numOfIps; SIpAddr ipAddr[TSDB_MAX_REPLICA]; } SCMVgroupInfo;