diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a72494dfad642fa6f2a1cb20e9b1c5d44434686d..5f6d218d0cff1098b580f3133383772852b0dd5c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -118,7 +118,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int p->groupId = *(uint64_t*) key; p->pos = *(SResultRowPosition*) pData; memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t)); - + qDebug("page_groupRes, groupId:%"PRIu64",pageId:%d,offset:%d\n", p->groupId, p->pos.pageId, p->pos.offset); taosArrayPush(pGroupResInfo->pRows, &p); } @@ -608,6 +608,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu } } + qDebug("page_setSelect num:%d", num); if (p != NULL) { p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; @@ -664,7 +665,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { // for simple column, the result buffer needs to hold at least one element. - pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; + pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes + sizeof(bool); // sizeof(bool) marks if data is null } pCtx->input.numOfInputCols = pFunct->numOfParams; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2863f1139cd36d4a3cf0c4c0e032af87d69f8027..7d9da5ec5a1436fc02075804d01678fedc47aeba 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -275,6 +275,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // 1. close current opened time window if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId && pResult->offset != pResultRowInfo->cur.offset))) { + qDebug("page_1"); SResultRowPosition pos = pResultRowInfo->cur; SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); releaseBufPage(pResultBuf, pPage); @@ -282,6 +283,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // allocate a new buffer page if (pResult == NULL) { + qDebug("page_2"); ASSERT(pSup->resultRowSize > 0); pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize); @@ -539,7 +541,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct if (pCtx[k].fpSet.process == NULL) { continue; } - + qDebug("page_process"); int32_t code = pCtx[k].fpSet.process(&pCtx[k]); if (code != TSDB_CODE_SUCCESS) { qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); @@ -1415,6 +1417,7 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t return; } + qDebug("page_setbuf, groupId:%"PRIu64, groupId); doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId); // record the current active group id @@ -1490,10 +1493,12 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI int32_t numOfExprs) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t start = pGroupResInfo->index; + qDebug("\npage_copytoblock rows:%d", numOfRows); for (int32_t i = start; i < numOfRows; i += 1) { SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); + qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); @@ -1526,6 +1531,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset); if (pCtx[j].fpSet.finalize) { + qDebug("\npage_finalize %d", numOfExprs); + int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code)) { qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); @@ -1554,9 +1561,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; - if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full - break; - } +// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full +// break; +// } } qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, @@ -3417,11 +3424,12 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf } void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { + ASSERT(numOfRows != 0); pOperator->resultInfo.capacity = numOfRows; pOperator->resultInfo.threshold = numOfRows * 0.75; if (pOperator->resultInfo.threshold == 0) { - pOperator->resultInfo.capacity = numOfRows; + pOperator->resultInfo.threshold = numOfRows; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c51ef44154143a0ccfcf4fbe89395dab65a1f242..bc050b7a68efc48e66677f4028027cee2673876e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2051,6 +2051,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); return TSDB_CODE_SUCCESS; } + pTableListInfo->needSortTableByGroupId = pTableScanNode->groupSort; code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 37132f06b4b0765053a358a5d6cfb27e8b4ba006..91ceb067c5367dcb5c6bdbbe2a2875a86ecc421c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -24,6 +24,7 @@ #include "tglobal.h" #include "thistogram.h" #include "tpercentile.h" +#include "query.h" #define HISTOGRAM_MAX_BINS_NUM 1000 #define MAVG_MAX_POINTS_NUM 1000 @@ -1458,7 +1459,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple int32_t offset = pTuplePos->offset; if (pTuplePos->pageId != -1) { - int32_t numOfCols = taosArrayGetSize(pCtx->pSrcBlock->pDataBlock); + int32_t numOfCols = pCtx->subsidiaries.num; SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); bool* nullList = (bool*)((char*)pPage + offset); @@ -1469,21 +1470,18 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; - int32_t srcSlotId = pFuncParam->pCol->slotId; int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; int32_t ps = 0; - for (int32_t k = 0; k < srcSlotId; ++k) { - SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k); - ps += pSrcCol->info.bytes; - } SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); - if (nullList[srcSlotId]) { + ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes); + if (nullList[j]) { colDataAppendNULL(pDstCol, rowIndex); } else { - colDataAppend(pDstCol, rowIndex, (pStart + ps), false); + colDataAppend(pDstCol, rowIndex, pStart, false); } + pStart += pDstCol->info.bytes; } releaseBufPage(pCtx->pBuf, pPage); @@ -2407,7 +2405,7 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = pNode->node.resType.bytes; + pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(bool); // sizeof(bool) marks if data is null return true; } @@ -3033,6 +3031,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset); // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, @@ -3051,6 +3050,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple by over writing the old data copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset); taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, topBotResComparFn, NULL, !isTopQuery); } @@ -3060,7 +3060,11 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { SFilePage* pPage = NULL; - int32_t completeRowSize = pSrcBlock->info.rowSize + (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock) * sizeof(bool); + int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool); + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + completeRowSize += pc->pExpr->base.resSchema.bytes; + } if (pCtx->curBufPage == -1) { pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage); @@ -3078,19 +3082,22 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS // keep the current row data, extract method int32_t offset = 0; bool* nullList = (bool*)((char*)pPage + pPage->num); - char* pStart = (char*)(nullList + sizeof(bool) * (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock)); - for (int32_t i = 0; i < (int32_t) taosArrayGetSize(pSrcBlock->pDataBlock); ++i) { - SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i); - bool isNull = colDataIsNull_s(pCol, rowIndex); - if (isNull) { - nullList[i] = true; + char* pStart = (char*)(nullList + sizeof(bool) * pCtx->subsidiaries.num); + for (int32_t i = 0; i < pCtx->subsidiaries.num; ++i) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i]; + + SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; + int32_t srcSlotId = pFuncParam->pCol->slotId; + + SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId); + if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) { offset += pCol->info.bytes; continue; } char* p = colDataGetData(pCol, rowIndex); if (IS_VAR_DATA_TYPE(pCol->info.type)) { - memcpy(pStart + offset, p, varDataTLen(p)); + memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p): varDataTLen(p)); } else { memcpy(pStart + offset, p, pCol->info.bytes); } @@ -3108,14 +3115,18 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId); - int32_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock); + int32_t numOfCols = pCtx->subsidiaries.num; bool* nullList = (bool*)((char*)pPage + pPos->offset); char* pStart = (char*)(nullList + numOfCols * sizeof(bool)); int32_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i); + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i]; + SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; + int32_t srcSlotId = pFuncParam->pCol->slotId; + + SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId); if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) { offset += pCol->info.bytes; continue; @@ -3123,7 +3134,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS char* p = colDataGetData(pCol, rowIndex); if (IS_VAR_DATA_TYPE(pCol->info.type)) { - memcpy(pStart + offset, p, varDataTLen(p)); + memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p): varDataTLen(p)); } else { memcpy(pStart + offset, p, pCol->info.bytes); } @@ -3154,7 +3165,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } else { colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); } - + qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset); setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); currentRow += 1; } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index cdf2629671b2ea880ed5b7b8e03c2c4907028bde..985fa5693f6d9ebebe9a51766bd9cad029a6c0d8 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -440,6 +440,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { } ((void**)pi->pData)[0] = pi; + uDebug("page_getNewBufPage pageId:%d, offset:%"PRId64, pi->pageId, pi->offset); return (void*)(GET_DATA_PAYLOAD(pi)); } @@ -463,6 +464,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { lruListMoveToFront(pBuf->lruList, (*pi)); (*pi)->used = true; + uDebug("page_getBufPage1 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset); return (void*)(GET_DATA_PAYLOAD(*pi)); } else { // not in memory assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); @@ -494,7 +496,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { return NULL; } } - + uDebug("page_getBufPage2 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset); return (void*)(GET_DATA_PAYLOAD(*pi)); } } @@ -506,8 +508,10 @@ void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { } void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { - assert(pi->pData != NULL && pi->used == true); + uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%"PRId64, pi->pageId, pi->used, pi->offset); + assert(pi->pData != NULL && pi->used == true); +// assert(pi->pData != NULL); pi->used = false; pBuf->statis.releasePages += 1; } diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index 6c36ffc192cc6623e8333f258b7ba0025da45612..25daeec6f1849c43ce9e4ba8890c425238a4330b 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -38,7 +38,10 @@ class TDTestCase: tdSql.init(conn.cursor(), logSql) def run(self): - tdSql.prepare() + # tdSql.prepare() + tdSql.execute('drop database if exists db') + tdSql.execute('create database db vgroups 1') + tdSql.execute('use db') print("============== STEP 1 ===== prepare data & validate json string") tdSql.error("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json, tagint int)") tdSql.error("create table if not exists jsons1(ts timestamp, data json) tags(tagint int)")