提交 6839ed22 编写于 作者: H Haojun Liao

fix(query): fix bug in tag filter.

上级 72287a3b
......@@ -1415,26 +1415,41 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *pUidTagInfo) {
if (numOfElems > 0) {
pSepecifiedUidMap = taosHashInit(numOfElems / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
for (int i = 0; i < numOfElems; i++) {
int64_t *uid = taosArrayGet(pUidTagInfo, i);
taosHashPut(pSepecifiedUidMap, uid, sizeof(int64_t), 0, 0);
STUidTagInfo *pTagInfo = taosArrayGet(pUidTagInfo, i);
taosHashPut(pSepecifiedUidMap, &pTagInfo->uid, sizeof(uint64_t), &i, sizeof(int32_t));
}
}
while (1) {
tb_uid_t uid = metaCtbCursorNext(pCur);
if (uid == 0) {
break;
}
if (numOfElems == 0) { // all data needs to be added into the pUidTagInfo list
while (1) {
tb_uid_t uid = metaCtbCursorNext(pCur);
if (uid == 0) {
break;
}
if (numOfElems > 0 && taosHashGet(pSepecifiedUidMap, &uid, sizeof(int64_t)) == NULL) {
continue;
} else if (numOfElems == 0) {
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
info.pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
taosArrayPush(pUidTagInfo, &info);
}
} else { // only the specified tables need to be added
while (1) {
tb_uid_t uid = metaCtbCursorNext(pCur);
if (uid == 0) {
break;
}
int32_t *index = taosHashGet(pSepecifiedUidMap, &uid, sizeof(uint64_t));
if (index == NULL) {
continue;
}
STUidTagInfo *pTagInfo = taosArrayGet(pUidTagInfo, *index);
if (pTagInfo->pTagVal == NULL) {
pTagInfo->pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(pTagInfo->pTagVal, pCur->pVal, pCur->vLen);
}
}
}
taosHashCleanup(pSepecifiedUidMap);
......
......@@ -48,7 +48,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* pRes
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* pExistedUidList, SNode* pTagCond);
static int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond,
SNode* pTagIndexCond, STableListInfo* pListInfo);
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList);
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* metaHandle);
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
......@@ -394,188 +394,6 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara
return TSDB_CODE_SUCCESS;
}
static void getColInfoResult(void* metaHandle, int64_t suid, SArray* pUidList, SNode* pTagCond) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* pBlockList = NULL;
SSDataBlock* pResBlock = NULL;
SHashObj* tags = NULL;
SScalarParam output = {0};
tagFilterAssist ctx = {0};
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
if (ctx.colHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if (ctx.cInfoList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
nodesRewriteExprPostOrder(&pTagCond, getColumn, (void*)&ctx);
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
// int64_t stt = taosGetTimestampUs();
SArray* pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
int32_t filter = optimizeTbnameInCond(metaHandle, suid, pUidTagList, pTagCond);
if (filter == 0) { // tbname in filter is activated, do nothing and return
int32_t numOfRows = taosArrayGetSize(pUidTagList);
code = createResultData(&type, numOfRows, &output);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qError("failed to create result, reason:%s", tstrerror(code));
goto end;
}
taosArrayEnsureCap(pUidList, numOfRows);
for(int32_t i = 0; i < numOfRows; ++i) {
STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
taosArrayPush(pUidList, &pInfo->uid);
}
terrno = 0;
goto end;
} else {
// here we retrieve all tags from the vnode table-meta store
int32_t numOfExisted = taosArrayGetSize(pUidList);
if (numOfExisted) {
for(int32_t i = 0; i < numOfExisted; ++i) {
uint64_t* uid = taosArrayGet(pUidList, i);
STUidTagInfo info = {.uid = *uid};
taosArrayPush(pUidTagList, &info);
}
}
code = metaGetTableTags(metaHandle, suid, pUidTagList);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), suid);
terrno = code;
goto end;
}
}
pResBlock = createDataBlock();
if (pResBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for (int32_t i = 0; i < taosArrayGetSize(ctx.cInfoList); ++i) {
SColumnInfoData colInfo = {0};
colInfo.info = *(SColumnInfo*)taosArrayGet(ctx.cInfoList, i);
blockDataAppendColInfo(pResBlock, &colInfo);
}
int32_t numOfTables = taosArrayGetSize(pUidTagList);
if (numOfTables == 0) {
goto end;
}
code = blockDataEnsureCapacity(pResBlock, numOfTables);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto end;
}
int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
for (int32_t i = 0; i < numOfTables; i++) {
STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
for (int32_t j = 0; j < numOfCols; j++) {
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
if (pColInfo->info.colId == -1) { // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(str, p1->name);
colDataAppend(pColInfo, i, str, false);
#if TAG_FILTER_DEBUG
qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
#endif
} else {
STagVal tagVal = {0};
tagVal.cid = pColInfo->info.colId;
if (p1->pTagVal == NULL) {
colDataAppendNULL(pColInfo, i);
}
const char* p = metaGetTableTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
colDataAppendNULL(pColInfo, i);
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
colDataAppend(pColInfo, i, p, false);
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
char* tmp = alloca(tagVal.nData + VARSTR_HEADER_SIZE + 1);
varDataSetLen(tmp, tagVal.nData);
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
colDataAppend(pColInfo, i, tmp, false);
#if TAG_FILTER_DEBUG
qDebug("tagfilter varch:%s", tmp + 2);
#endif
} else {
colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false);
#if TAG_FILTER_DEBUG
if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
} else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
}
#endif
}
}
}
}
pResBlock->info.rows = numOfTables;
// int64_t st1 = taosGetTimestampUs();
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
pBlockList = taosArrayInit(2, POINTER_BYTES);
taosArrayPush(pBlockList, &pResBlock);
code = createResultData(&type, numOfTables, &output);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto end;
}
code = scalarCalculate(pTagCond, pBlockList, &output);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to calculate scalar, reason:%s", tstrerror(code));
terrno = code;
goto end;
}
taosArrayClear(pUidList);
bool* pResult = (bool*)output.columnData->pData;
for(int32_t i = 0; i < numOfTables; ++i) {
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResult[i]);
if (pResult[i]) {
taosArrayPush(pUidList, &uid);
}
i += 1;
}
end:
taosHashCleanup(tags);
taosHashCleanup(ctx.colHash);
taosArrayDestroy(ctx.cInfoList);
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
colDataDestroy(output.columnData);
taosMemoryFreeClear(output.columnData);
// return output.columnData;
}
static void releaseColInfoData(void* pCol) {
if (pCol) {
SColumnInfoData* col = (SColumnInfoData*)pCol;
......@@ -584,12 +402,17 @@ static void releaseColInfoData(void* pCol) {
}
}
void freeItem(void* p) {
STUidTagInfo *pInfo = p;
if (pInfo->pTagVal != NULL) {
taosMemoryFree(pInfo->pTagVal);
}
}
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* pBlockList = NULL;
SSDataBlock* pResBlock = NULL;
SHashObj* tags = NULL;
SArray* uidList = NULL;
void* keyBuf = NULL;
SArray* groupData = NULL;
......@@ -618,21 +441,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
REPLACE_NODE(pNode);
}
pResBlock = createDataBlock();
if (pResBlock == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for (int32_t i = 0; i < taosArrayGetSize(ctx.cInfoList); ++i) {
SColumnInfoData colInfo = {0};
colInfo.info = *(SColumnInfo*)taosArrayGet(ctx.cInfoList, i);
blockDataAppendColInfo(pResBlock, &colInfo);
}
SArray* pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
uidList = taosArrayInit(rows, sizeof(uint64_t));
for (int32_t i = 0; i < rows; ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
STUidTagInfo info = {.uid = pkeyInfo->uid};
......@@ -640,15 +449,15 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
}
// int64_t stt = taosGetTimestampUs();
tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
code = metaGetTableTags(metaHandle, pTableListInfo->suid, pUidTagList);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
int32_t numOfTables = taosArrayGetSize(pUidTagList);
pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, pUidTagList);
if (code != TSDB_CODE_SUCCESS) {
pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, pUidTagList, metaHandle);
if (pResBlock == NULL) {
code = terrno;
goto end;
}
......@@ -759,12 +568,11 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
end:
taosMemoryFreeClear(keyBuf);
taosHashCleanup(tags);
taosHashCleanup(ctx.colHash);
taosArrayDestroy(ctx.cInfoList);
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
taosArrayDestroy(uidList);
taosArrayDestroyEx(pUidTagList, freeItem);
taosArrayDestroyP(groupData, releaseColInfoData);
return code;
}
......@@ -992,7 +800,7 @@ static void genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
taosMemoryFree(payload);
}
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList) {
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* metaHandle) {
SSDataBlock* pResBlock = createDataBlock();
if (pResBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -1023,7 +831,12 @@ static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTa
if (pColInfo->info.colId == -1) { // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(str, p1->name);
if (p1->name != NULL) {
STR_TO_VARSTR(str, p1->name);
} else { // name is not retrieved during filter
metaGetTableNameByUid(metaHandle, p1->uid, str);
}
colDataAppend(pColInfo, i, str, false);
#if TAG_FILTER_DEBUG
qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
......@@ -1151,8 +964,9 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
goto end;
}
pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, pUidTagList);
if (code != TSDB_CODE_SUCCESS) {
pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, pUidTagList, metaHandle);
if (pResBlock == NULL) {
code = terrno;
goto end;
}
......@@ -1163,7 +977,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
code = createResultData(&type, numOfTables, &output);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
terrno = code k;
goto end;
}
......@@ -1181,11 +995,11 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
taosArrayDestroy(ctx.cInfoList);
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
taosArrayDestroyEx(pUidTagList, freeItem);
colDataDestroy(output.columnData);
taosMemoryFreeClear(output.columnData);
return TSDB_CODE_SUCCESS;
return code;
}
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册