提交 b90807ce 编写于 作者: H Haojun Liao

[td-1373]

上级 e06d7083
......@@ -215,7 +215,7 @@ SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols);
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
......@@ -224,6 +224,8 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
......
......@@ -2283,7 +2283,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList);
tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
}
if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
......
......@@ -360,6 +360,31 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pExpr->numOfParams = 1;
}
int32_t num = 0;
int32_t *list = NULL;
tsBufGetVnodeIdList(pSupporter->pTSBuf, &num, &list);
for(int32_t k = 0; k < taosArrayGetSize(pTableMetaInfo->pVgroupTables);) {
SVgroupTableInfo* p = taosArrayGet(pTableMetaInfo->pVgroupTables, k);
bool found = false;
for(int32_t f = 0; f < num; ++f) {
if (p->vgInfo.vgId == list[f]) {
found = true;
break;
}
}
if (!found) {
tscRemoveVgroupTableGroup(pTableMetaInfo->pVgroupTables, k);
} else {
k++;
}
}
taosTFree(list);
assert(taosArrayGetSize(pTableMetaInfo->pVgroupTables) > 0);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList),
......@@ -828,6 +853,8 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
updateQueryTimeRange(pPQueryInfo, &win);
//update the vgroup that involved in real data query
tscLaunchRealSubqueries(pParentSql);
}
......
......@@ -1680,19 +1680,64 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
}
void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
if (pVgroupTables != NULL) {
size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
if (pVgroupTables == NULL) {
return;
}
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList);
}
taosArrayDestroy(pVgroupTables);
}
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
assert(pVgroupTable != NULL && index >= 0);
size_t size = taosArrayGetSize(pVgroupTable);
assert(size > index);
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList);
taosArrayDestroy(pInfo->itemList);
taosArrayRemove(pVgroupTable, index);
}
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
if (pVgroupTables == NULL) {
return NULL;
}
size_t num = taosArrayGetSize(pVgroupTables);
SArray* pa = taosArrayInit(num, sizeof(SVgroupTableInfo));
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pVgroupTables);
SVgroupTableInfo info = {0};
info.vgInfo = pInfo->vgInfo;
info.itemList = taosArrayClone(pInfo->itemList);
taosArrayPush(pa, &info);
}
return pa;
}
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
......@@ -1710,7 +1755,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem
}
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols) {
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables) {
void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
if (pAlloc == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1744,13 +1789,15 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
if (pTagCols != NULL) {
tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
}
pTableMetaInfo->pVgroupTables = tscCloneVgroupTableInfo(pVgroupTables);
pQueryInfo->numOfTables += 1;
return pTableMetaInfo;
}
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL);
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL, NULL);
}
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
......@@ -1824,7 +1871,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
assert(pSql->cmd.clauseIndex == 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
registerSqlObj(pNew);
return pNew;
......@@ -1989,14 +2036,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup
assert(pTableMeta != NULL);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList,
pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables);
} else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pPrevInfo->pTableMeta);
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList,
pTableMetaInfo->pVgroupTables);
}
if (pFinalInfo->pTableMeta == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册