提交 31c2aa89 编写于 作者: H Haojun Liao

fix(query): do some internal refactor.

上级 0948216c
......@@ -405,7 +405,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
ctx.index = 0;
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if (ctx.cInfoList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -964,34 +964,98 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
return -1;
}
static void genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
if (pTagCond == NULL) {
return;
}
char* payload = NULL;
int32_t len = 0;
nodesNodeToMsg(pTagCond, &payload, &len);
tMD5Init(pContext);
tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
tMD5Final(pContext);
taosMemoryFree(payload);
}
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* res, SNode* pTagCond, void* metaHandle) {
if (pTagCond == NULL) {
return TSDB_CODE_SUCCESS;
}
terrno = TDB_CODE_SUCCESS;
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond);
if (terrno != TDB_CODE_SUCCESS) {
colDataDestroy(pColInfoData);
taosMemoryFreeClear(pColInfoData);
taosArrayDestroy(res);
qError("failed to getColInfoResult, code: %s", tstrerror(terrno));
return terrno;
}
int32_t i = 0;
int32_t len = taosArrayGetSize(res);
if (pColInfoData != NULL) {
bool* pResult = (bool*)pColInfoData->pData;
SArray* p = taosArrayInit(taosArrayGetSize(res), sizeof(uint64_t));
while (i < len && pColInfoData) {
int64_t* uid = taosArrayGet(res, i);
qDebug("tagfilter get uid:%" PRId64 ", res:%d", *uid, pResult[i]);
if (pResult[i]) {
taosArrayPush(p, uid);
}
i += 1;
}
taosArraySwap(res, p);
taosArrayDestroy(p);
}
colDataDestroy(pColInfoData);
taosMemoryFreeClear(pColInfoData);
return TSDB_CODE_SUCCESS;
}
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numOfTables = 0;
uint64_t tableUid = pScanNode->uid;
pListInfo->suid = pScanNode->suid;
SArray* res = taosArrayInit(8, sizeof(uint64_t));
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
// try to retrieve the result from meta cache
// generate the cache key
T_MD5_CTX context = {0};
if (pTagIndexCond) {
char* payload = NULL;
int32_t len = 0;
nodesNodeToMsg(pTagCond, &payload, &len);
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)payload, (uint32_t)len);
tMD5Final(&context);
if (pScanNode->tableType != TSDB_SUPER_TABLE) {
if (metaIsTableExist(metaHandle, tableUid)) {
taosArrayPush(res, &tableUid);
}
taosMemoryFree(payload);
code = doFilterByTagCond(pListInfo, res, pTagCond, metaHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
// try to retrieve the result from meta cache
T_MD5_CTX context = {0};
genTagFilterDigest(pTagCond, &context);
bool acquired = false;
metaGetCachedTableUidList(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), res, &acquired);
if (!acquired) {
if (acquired) {
qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(res));
goto _end;
}
if (!pTagCond) { // no tag condition exists, let's fetch all tables of this super table
ASSERT(pTagIndexCond == NULL);
vnodeGetCtbIdList(pVnode, pScanNode->suid, res);
} else {
// failed to find the result in the cache, let try to calculate the results
if (pTagIndexCond) {
SIndexMetaArg metaArg = {
......@@ -1003,63 +1067,29 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
code = TDB_CODE_SUCCESS;
}
} else if (!pTagCond) {
vnodeGetCtbIdList(pVnode, pScanNode->suid, res);
}
// let's add the filter results into meta-cache
size_t numOfTables = taosArrayGetSize(res);
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
char* pPayload = taosMemoryMalloc(size);
*(int32_t*)pPayload = numOfTables;
if (numOfTables > 0) {
memcpy(pPayload + sizeof(int32_t), taosArrayGet(res, 0), numOfTables * sizeof(uint64_t));
}
metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload,
size, 1);
} else {
qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t) taosArrayGetSize(res));
}
} else { // Create one table group.
if (metaIsTableExist(metaHandle, tableUid)) {
taosArrayPush(res, &tableUid);
}
}
if (pTagCond) {
terrno = TDB_CODE_SUCCESS;
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond);
if (terrno != TDB_CODE_SUCCESS) {
colDataDestroy(pColInfoData);
taosMemoryFreeClear(pColInfoData);
taosArrayDestroy(res);
qError("failed to getColInfoResult, code: %s", tstrerror(terrno));
return terrno;
code = doFilterByTagCond(pListInfo, res, pTagCond, metaHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t i = 0;
int32_t j = 0;
int32_t len = taosArrayGetSize(res);
while (i < taosArrayGetSize(res) && j < len && pColInfoData) {
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
// let's add the filter results into meta-cache
numOfTables = taosArrayGetSize(res);
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
char* pPayload = taosMemoryMalloc(size);
*(int32_t*)pPayload = numOfTables;
int64_t* uid = taosArrayGet(res, i);
qDebug("tagfilter get uid:%" PRId64 ", res:%d", *uid, *(bool*)var);
if (*(bool*)var == false) {
taosArrayRemove(res, i);
j++;
continue;
}
i++;
j++;
if (numOfTables > 0) {
memcpy(pPayload + sizeof(int32_t), taosArrayGet(res, 0), numOfTables * sizeof(uint64_t));
}
colDataDestroy(pColInfoData);
taosMemoryFreeClear(pColInfoData);
metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
}
size_t numOfTables = taosArrayGetSize(res);
_end:
numOfTables = taosArrayGetSize(res);
for (int i = 0; i < numOfTables; i++) {
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
......
......@@ -708,24 +708,22 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
}
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t type = pAvgRes->type;
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.usum / ((double)pAvgRes->count);
} else {
pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
}
// check for overflow
if (isinf(pAvgRes->result) || isnan(pAvgRes->result)) {
GET_RES_INFO(pCtx)->numOfRes = 0;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SAvgRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t type = pRes->type;
if (pRes->count > 0) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pRes->result = pRes->sum.isum / ((double)pRes->count);
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pRes->result = pRes->sum.usum / ((double)pRes->count);
} else {
pRes->result = pRes->sum.dsum / ((double)pRes->count);
}
}
pEntryInfo->numOfRes = (pRes->count > 0)? 1:0;
return functionFinalize(pCtx, pBlock);
}
......
......@@ -5,30 +5,32 @@
#run tsim/table/basic1.sim
#run tsim/trans/lossdata1.sim
#run tsim/trans/create_db.sim
run tsim/stable/alter_metrics.sim
run tsim/stable/tag_modify.sim
run tsim/stable/alter_comment.sim
run tsim/stable/column_drop.sim
run tsim/stable/column_modify.sim
run tsim/stable/tag_rename.sim
run tsim/stable/vnode3.sim
run tsim/stable/metrics.sim
run tsim/stable/alter_insert2.sim
run tsim/stable/show.sim
run tsim/stable/alter_import.sim
run tsim/stable/tag_add.sim
run tsim/stable/tag_drop.sim
run tsim/stable/column_add.sim
run tsim/stable/alter_count.sim
run tsim/stable/values.sim
run tsim/stable/dnode3.sim
run tsim/stable/alter_insert1.sim
run tsim/stable/refcount.sim
run tsim/stable/tag_filter.sim
run tsim/stable/disk.sim
run tsim/db/basic1.sim
#run tsim/stable/alter_metrics.sim
#run tsim/stable/tag_modify.sim
#run tsim/stable/alter_comment.sim
#run tsim/stable/column_drop.sim
#run tsim/stable/column_modify.sim
#run tsim/stable/tag_rename.sim
#run tsim/stable/vnode3.sim
#run tsim/stable/metrics.sim
#run tsim/stable/alter_insert2.sim
#run tsim/stable/alter_import.sim
#run tsim/stable/tag_add.sim
#run tsim/stable/tag_drop.sim
#run tsim/stable/column_add.sim
#run tsim/stable/alter_count.sim
#run tsim/stable/values.sim
#run tsim/stable/dnode3.sim
#run tsim/stable/alter_insert1.sim
#run tsim/stable/refcount.sim
#run tsim/stable/tag_filter.sim
#run tsim/stable/disk.sim
#run tsim/db/basic1.sim
run tsim/db/basic2.sim
run tsim/db/basic3.sim
run tsim/db/basic7.sim
run tsim/db/basic4.sim
run tsim/db/basic5.sim
run tsim/db/basic6.sim
run tsim/db/alter_replica_13.sim
run tsim/db/create_all_options.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册