diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 80f956d3dd5dbc29bccc0207dc48d7c261d13b4f..34b7fce33cbccc11a6699636f160d44bd0ee4665 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -658,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin void cleanupAggSup(SAggSupporter* pAggSup); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); +SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo); SSDataBlock* loadNextDataBlock(void* param); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 593089f28abcd033b175273df88b31d3611cd9f5..fe5066a06585ec76fcb27cd221436224c23a12ce 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4603,7 +4603,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i return s; } -static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) { +static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType) { SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn)); if (pCol == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -4611,9 +4611,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) } pCol->slotId = slotId; - pCol->bytes = pType->bytes; - pCol->type = pType->type; - pCol->scale = pType->scale; + pCol->colId = colId; + pCol->bytes = pType->bytes; + pCol->type = pType->type; + pCol->scale = pType->scale; pCol->precision = pType->precision; pCol->dataBlockId = blockId; @@ -4656,7 +4657,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); - pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType); + pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType); pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; } else if (type == QUERY_NODE_VALUE) { pExp->pExpr->nodeType = QUERY_NODE_VALUE; @@ -4708,7 +4709,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SColumnNode* pcn = (SColumnNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; - pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType); + pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType); } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; @@ -4788,7 +4789,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t numOfCols = 0; + int32_t numOfCols = 0; tsdbReaderT pDataReader = NULL; if (pHandle->vnode) { @@ -4797,6 +4798,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); } + if (pDataReader == NULL && terrno != 0) { qDebug("pDataReader is NULL"); // return NULL; @@ -4816,12 +4818,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SInterval interval = extractIntervalInfo(pTableScanNode); - SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo( - pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, - pResBlockDumy, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); + SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - // int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, - // queryId, taskId); SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createResDataBlock(pDescNode); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 96e5d7d1f2166438c315aa82c275116ef992a44c..1483b6b042e44c858bb01a4d3f8d636f776cd474 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "filter.h" #include "function.h" +#include "filter.h" #include "functionMgt.h" #include "os.h" #include "querynodes.h" @@ -1408,35 +1408,33 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { char str[512] = {0}; int32_t count = 0; SMetaReader mr = {0}; + metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) { STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos); + metaGetTableEntryByUid(&mr, item->uid); for (int32_t j = 0; j < pOperator->numOfExprs; ++j) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); // refactor later if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { - metaReaderInit(&mr, pInfo->readHandle.meta, 0); - metaGetTableEntryByUid(&mr, item->uid); - STR_TO_VARSTR(str, mr.me.name); - metaReaderClear(&mr); - colDataAppend(pDst, count, str, false); - // data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes); - // dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes; - // doSetTagValueToResultBuf(dst, data, type, bytes); + } else { // it is a tag value + const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId); + colDataAppend(pDst, count, p, (p == NULL)); } - - count += 1; } + count += 1; if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) { pOperator->status = OP_EXEC_DONE; } } + metaReaderClear(&mr); + // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count); if (pOperator->status == OP_EXEC_DONE) { setTaskStatus(pTaskInfo, TASK_COMPLETED); diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index b3b60f93a4f418acfcd78ed8819342ee4df1d93a..ba39b09f56302964923cdfc5d073936bf46da3ee 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -19,17 +19,18 @@ int32_t udf2_destroy() { int32_t udf2_start(SUdfInterBuf *buf) { *(int64_t*)(buf->buf) = 0; - buf->bufLen = sizeof(int64_t); + buf->bufLen = sizeof(double); buf->numOfResult = 0; return 0; } int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { - int64_t sumSquares = *(int64_t*)interBuf->buf; + double sumSquares = *(double*)interBuf->buf; int8_t numOutput = 0; for (int32_t i = 0; i < block->numOfCols; ++i) { SUdfColumn* col = block->udfCols[i]; - if (col->colMeta.type != TSDB_DATA_TYPE_INT) { + if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || + col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { return TSDB_CODE_UDF_INVALID_INPUT; } } @@ -39,17 +40,29 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte if (udfColDataIsNull(col, j)) { continue; } - - char* cell = udfColDataGetData(col, j); - int32_t num = *(int32_t*)cell; - sumSquares += num * num; + switch (col->colMeta.type) { + case TSDB_DATA_TYPE_INT: { + char* cell = udfColDataGetData(col, j); + int32_t num = *(int32_t*)cell; + sumSquares += num * num; + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + char* cell = udfColDataGetData(col, j); + double num = *(double*)cell; + sumSquares += num * num; + break; + } + default: + break; + } numOutput = 1; } } if (numOutput == 1) { - *(int64_t*)(newInterBuf->buf) = sumSquares; - newInterBuf->bufLen = sizeof(int64_t); + *(double*)(newInterBuf->buf) = sumSquares; + newInterBuf->bufLen = sizeof(double); } newInterBuf->numOfResult = numOutput; return 0; @@ -60,7 +73,7 @@ int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) { resultData->numOfResult = 0; return 0; } - int64_t sumSquares = *(int64_t*)(buf->buf); + double sumSquares = *(double*)(buf->buf); *(double*)(resultData->buf) = sqrt(sumSquares); resultData->bufLen = sizeof(double); resultData->numOfResult = 1; diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index 62904032148da25d6aa1e71cc281c4306832ec88..19e133e9496ea6161656d830432049eebe407e8c 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -3,7 +3,7 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/cfg.sh -n dnode1 -c wallevel -v 2 system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 -system sh/cfg.sh -n dnode1 -c startUdfd -v 1 +system sh/cfg.sh -n dnode1 -c udf -v 1 print ========= start dnode1 as LEADER system sh/exec.sh -n dnode1 -s start