diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index e8d9e1ff486ff856900e0cbab3570bbe4b2993ea..6c7cbbc615b76ee64c507f1476675155fd42a6e9 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -302,9 +302,8 @@ int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name); STableMeta* tscTableMetaDup(STableMeta* pTableMeta); int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr); -void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema); -void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, - uint64_t* qId, char* sql, void* addr); +void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema); +void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 5095a9355f3be482f798bcb9c3b86b7f8ab13c25..d181b9ea9610048f3089f1e63fef69941721c802 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -57,7 +57,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { } // todo merge with vnode side function -void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema) { +void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema) { size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { @@ -69,16 +69,14 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem pCtx[i].order = pQueryInfo->order.order; pCtx[i].functionId = pExpr->base.functionId; - // input buffer hold only one point data - SSchema *s = &pSchema[i].field; - // input data format comes from pModel - pCtx[i].inputType = s->type; - pCtx[i].inputBytes = s->bytes; + pCtx[i].inputType = pSchema[i].type; + pCtx[i].inputBytes = pSchema[i].bytes; pCtx[i].outputBytes = pExpr->base.resBytes; pCtx[i].outputType = pExpr->base.resType; + // input buffer hold only one point data pCtx[i].size = 1; pCtx[i].hasNull = true; pCtx[i].currentStage = MERGE_STAGE; @@ -374,7 +372,13 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pReducer->pTempBuffer->num = 0; tscCreateResPointerInfo(pRes, pQueryInfo); - tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx, pDesc->pColumnModel->pFields); + + SSchema* pschema = calloc(pDesc->pColumnModel->numOfCols, sizeof(SSchema)); + for(int32_t i = 0; i < pDesc->pColumnModel->numOfCols; ++i) { + pschema[i] = pDesc->pColumnModel->pFields[i].field; + } + + tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx, pschema); setCtxInputOutputBuffer(pQueryInfo, pReducer->pCtx, pReducer, pDesc); // we change the capacity of schema to denote that there is only one row in temp buffer diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1c5c36806a683398e652e7d0b75d0700478928f6..995eea30a383fbfbc0ec4bd096f3837533e055bc 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -24,6 +24,7 @@ #include "tschemautil.h" #include "tsclient.h" #include "qUtil.h" +#include "qPlan.h" typedef struct SInsertSupporter { SSqlObj* pSql; @@ -3216,8 +3217,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { return hasData; } -void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, - uint64_t* qId, char* sql, void* addr) { +void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr) { assert(pQueryInfo != NULL); int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; @@ -3230,9 +3230,13 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST pQInfo->signature = pQInfo; SQueryAttr *pQueryAttr = &pQInfo->query; - pQInfo->runtimeEnv.pQueryAttr = pQueryAttr; + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + + pRuntimeEnv->pQueryAttr = pQueryAttr; tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, addr); + pQueryAttr->tableGroupInfo = *pTableGroupInfo; + // calculate the result row size for (int16_t col = 0; col < numOfOutput; ++col) { assert(pExprs[col].base.resBytes > 0); @@ -3244,12 +3248,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST } } -// doUpdateExprColumnIndex(pQueryAttr); -// int32_t ret = createFilterInfo(pQInfo, pQueryAttr); -// if (ret != TSDB_CODE_SUCCESS) { -// goto _cleanup; -// } - size_t numOfGroups = 0; if (pTableGroupInfo->pGroupList != NULL) { numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); @@ -3272,12 +3270,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST pthread_mutex_init(&pQInfo->lock, NULL); tsem_init(&pQInfo->ready, 0, 0); -// changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); - - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - - STimeWindow window = pQueryAttr->window; - int32_t index = 0; for(int32_t i = 0; i < numOfGroups; ++i) { SArray* pa = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); @@ -3290,6 +3282,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST taosArrayPush(pRuntimeEnv->tableqinfoGroupInfo.pGroupList, &p1); + STimeWindow window = pQueryAttr->window; for(int32_t j = 0; j < s; ++j) { STableKeyInfo* info = taosArrayGet(pa, j); window.skey = info->lastKey; @@ -3309,13 +3302,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST } } -// colIdCheck(pQueryAttr, pQInfo); - - pQInfo->qId = 0; - if (qId != NULL) { - *qId = pQInfo->qId; - } - // qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo); // return pQInfo; // if (pGroupbyExpr != NULL) { @@ -3334,9 +3320,13 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST tfree(pExprs); + SArray* pa = createExecOperatorPlan(pQueryAttr); + STsBufInfo bufInfo = {0}; - SQueryParam param = {0}; + SQueryParam param = {.pOperator = pa}; /*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, ¶m, NULL, 0); + + pQInfo->runtimeEnv.proot->upstream = pOperator; qTableQuery(pQInfo); return pQInfo; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4e600f42fe8e7efa45f24615fdfd41c62671e873..209c3223aa4b274800ecd844b8561c8c0b0eb29a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -539,6 +539,62 @@ static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* return pColInfo; } +typedef struct SDummyInputInfo { + SSDataBlock block; + SSqlRes *pRes; // refactor: remove it +} SDummyInputInfo; + +SSDataBlock* doGetDataBlock(void* param) { + SOperatorInfo *pOperator = (SOperatorInfo*) param; + + SDummyInputInfo *pInput = pOperator->info; + char* pData = pInput->pRes->data; + + SSDataBlock* pBlock = &pInput->block; + pBlock->info.rows = pInput->pRes->numOfRows; + if (pBlock->info.rows == 0) { + return NULL; + } + + int32_t offset = 0; + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); + pColData->pData = pData + offset * pBlock->info.rows; + + offset += pColData->info.bytes; + } + + pInput->pRes->numOfRows = 0; + return pBlock; +} + +SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t numOfCols) { + assert(numOfCols > 0); + + SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo)); + pInfo->pRes = (SSqlRes*) pResult; + + pInfo->block.info.numOfCols = numOfCols; + pInfo->block.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colData = {0}; + colData.info.bytes = pSchema[i].bytes; + colData.info.type = pSchema[i].type; + colData.info.colId = pSchema[i].colId; + + taosArrayPush(pInfo->block.pDataBlock, &colData); + } + + SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); + pOptr->name = "DummyInputOperator"; + pOptr->operatorType = OP_DummyInput; + pOptr->blockingOptr = false; + pOptr->info = pInfo; + pOptr->exec = doGetDataBlock; + + return pOptr; +} + void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { // handle the following query process @@ -553,22 +609,29 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { int32_t numOfCols = taosArrayGetSize(px->colList); SQueriedTableInfo info = {.colList = colInfo, .numOfCols = numOfCols,}; - /*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL); - tsCreateSQLFunctionCtx(px, pCtx, NULL); - STableGroupInfo tableGroupInfo = {0}; - tableGroupInfo.numOfTables = 1; - tableGroupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); + /*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL); + SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta); + tsCreateSQLFunctionCtx(px, pCtx, pSchema); - SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); + STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),}; + tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN}; + + SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); taosArrayPush(group, &tableKeyInfo); taosArrayPush(tableGroupInfo.pGroupList, &group); - SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, 0, NULL, NULL); - printf("%p\n", pQInfo); + SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput); + + SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL); + //printf("%p\n", pQInfo); + SSDataBlock* pres = pQInfo->runtimeEnv.outputBuf; + + // build result + pRes->numOfRows = pres->info.rows; } } @@ -3121,6 +3184,10 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf } static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) { + if (pTableMetaInfo->tagColList == NULL) { + return TSDB_CODE_SUCCESS; + } + pQueryAttr->numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); if (pQueryAttr->numOfTags == 0) { return TSDB_CODE_SUCCESS; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5bbd40a861d134b65ea10336c7ea764d98498482..b66385add47dd093311aa3376f887d60cf070346 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -289,6 +289,7 @@ enum OPERATOR_TYPE_E { OP_Fill = 13, OP_MultiTableAggregate = 14, OP_MultiTableTimeInterval = 15, + OP_DummyInput = 16, //TODO remove it after fully refactor. }; typedef struct SOperatorInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 7016d5f07fb310d30b7f3ed18c03843f29d71ce2..89dabe40dcb8fdd398fc4d62f2d4bc46a1528e66 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -35,13 +35,6 @@ #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) -#define CHECK_IF_QUERY_KILLED(_q) \ - do { \ - if (isQueryKilled((_q)->qinfo)) { \ - longjmp((_q)->env, TSDB_CODE_TSC_QUERY_CANCELLED); \ - } \ - } while (0) - #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define TIME_WINDOW_COPY(_dst, _src) do {\ @@ -98,7 +91,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) - uint64_t queryHandleId = 0; int32_t getMaximumIdleDurationSec() { @@ -143,8 +135,8 @@ static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) { } static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); -static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, - int32_t numOfCols, int32_t* rowCellInfoOffset); +static void setResultOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SQLFunctionCtx* pCtx, + int32_t numOfCols, int32_t* rowCellInfoOffset); void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); @@ -163,7 +155,6 @@ static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr); -//static bool isFixedOutputQuery(SQueryAttr* pQueryAttr); static SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); @@ -1732,7 +1723,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf SOperatorInfo* prev = pRuntimeEnv->pTableScanner; if (i == 0) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + if (pRuntimeEnv->pTableScanner != NULL) { // TODO refactor + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + } } else { prev = pRuntimeEnv->proot; pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); @@ -3949,7 +3942,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; - pQueryAttr->tsdb = tsdb; + pQueryAttr->tsdb = tsdb; pRuntimeEnv->prevResult = prevResult; pRuntimeEnv->qinfo = pQInfo; @@ -3971,20 +3964,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->cur.vgroupIndex = -1; setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo); - /* - if (onlyQueryTags(pQueryAttr)) { -// pRuntimeEnv->resultInfo.capacity = 4096; -// pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - } else if (pQueryAttr->queryBlockDist) { - pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); - } else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) { - pRuntimeEnv->pTableScanner = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); - } else if (needReverseScan(pQueryAttr)) { - pRuntimeEnv->pTableScanner = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), 1); - } else { - pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); - } -*/ switch(tbScanner) { case OP_TableBlockInfoScan: { pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); @@ -4002,8 +3981,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); break; } - default: { - // do nothing + default: { // do nothing break; } } @@ -4531,14 +4509,19 @@ static SSDataBlock* doArithmeticOperation(void* param) { STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // todo dynamic set tags - setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + if (pTableQueryInfo != NULL) { + setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); updateOutputBuf(pArithInfo, pBlock->info.rows); arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order); + + if (pTableQueryInfo != NULL) { // TODO refactor + updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order); + } pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {