提交 6d593fe8 编写于 作者: H Haojun Liao

[td-3571]<enhance>: enable update the vgroup info during super table query.

上级 247cf369
...@@ -34,6 +34,7 @@ int tscKeepConn[TSDB_SQL_MAX] = {0}; ...@@ -34,6 +34,7 @@ int tscKeepConn[TSDB_SQL_MAX] = {0};
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt); TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub); void tscSaveSubscriptionProgress(void* sub);
static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static int32_t getWaitingTimeInterval(int32_t count) { static int32_t getWaitingTimeInterval(int32_t count) {
...@@ -78,7 +79,8 @@ static void tscEpSetHtons(SRpcEpSet *s) { ...@@ -78,7 +79,8 @@ static void tscEpSetHtons(SRpcEpSet *s) {
for (int32_t i = 0; i < s->numOfEps; i++) { for (int32_t i = 0; i < s->numOfEps; i++) {
s->port[i] = htons(s->port[i]); s->port[i] = htons(s->port[i]);
} }
} }
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false; return false;
...@@ -118,12 +120,15 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { ...@@ -118,12 +120,15 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
return; return;
} }
int32_t vgId = pTableMetaInfo->pTableMeta->vgId; int32_t vgId = -1;
if (pTableMetaInfo->pTableMeta->tableType == TSDB_SUPER_TABLE) { if (pTableMetaInfo->pTableMeta->tableType == TSDB_SUPER_TABLE) {
assert(vgId == 0); vgId = extractSTableQueryVgroupId(pTableMetaInfo);
return; } else {
vgId = pTableMetaInfo->pTableMeta->vgId;
} }
assert(vgId > 0);
SNewVgroupInfo vgroupInfo = {.vgId = -1}; SNewVgroupInfo vgroupInfo = {.vgId = -1};
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0); assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
...@@ -140,6 +145,27 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { ...@@ -140,6 +145,27 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
} }
int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo) {
assert(pTableMetaInfo != NULL);
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
int32_t vgId = -1;
if (pTableMetaInfo->pVgroupTables == NULL) {
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
vgId = pVgroupInfo->vgroups[vgIndex].vgId;
} else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
SVgroupTableInfo *pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
vgId = pTableIdList->vgInfo.vgId;
}
return vgId;
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return; if (pObj == NULL) return;
...@@ -515,21 +541,22 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -515,21 +541,22 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t vgIndex = pTableMetaInfo->vgroupIndex; int32_t vgIndex = pTableMetaInfo->vgroupIndex;
int32_t vgId = -1;
if (pTableMetaInfo->pVgroupTables == NULL) { if (pTableMetaInfo->pVgroupTables == NULL) {
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
vgId = pVgroupInfo->vgroups[vgIndex].vgId;
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qId);
} else { } else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups); assert(vgIndex >= 0 && vgIndex < numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
vgId = pTableIdList->vgInfo.vgId;
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qId);
} }
pRetrieveMsg->header.vgId = htonl(vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, vgId, vgIndex, pSql->res.qId);
} else { } else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册