diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 581cd37cbd53cb87847fc5a13c88b03eb797d93a..45fa11d143ab593f294d6db603b6e4c40f0daab8 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -62,6 +62,8 @@ typedef struct SLocalMerger { bool discard; int32_t offset; // limit offset value bool orderPrjOnSTable; // projection query on stable + char* tagBuf; // max tag buffer + int32_t tagBufLen; } SLocalMerger; typedef struct SRetrieveSupport { diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index b8291a923a1da01160d123ca8ef3f80aa18d12a9..31ba3db88f05fdedde2286b32965fe7917e705b5 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -98,10 +98,9 @@ typedef struct SMergeTsCtx { int8_t compared; }SMergeTsCtx; - typedef struct SVgroupTableInfo { SVgroupInfo vgInfo; - SArray* itemList; //SArray + SArray *itemList; // SArray } SVgroupTableInfo; static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) { @@ -321,7 +320,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta); int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr); void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema); -void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr); +void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 0856efe5185a9dda7d1efe7c63995f451dbda69e..7b66a34c807c8194c08a67b59ecbb8f27076c8fd 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -213,7 +213,7 @@ typedef struct SQueryInfo { int32_t round; // 0/1/.... int32_t bufLen; char* buf; - + SQInfo* pQInfo; // global merge operator SArray* pDSOperator; // data source operator SArray* pPhyOperator; // physical query execution plan SQueryAttr* pQueryAttr; // query object @@ -420,6 +420,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); +void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock); + void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index f590082cf08dc91878f3cd29893144a4479d78fe..8baa2abf7dfd7d9b54b0b4370a16f011bd148fa1 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -30,6 +30,8 @@ typedef struct SCompareParam { int32_t groupOrderType; } SCompareParam; +bool needToMergeRv(SSDataBlock* pBlock, SLocalMerger *pLocalMerge, int32_t index, char **buf); + int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { int32_t pLeftIdx = *(int32_t *)pLeft; int32_t pRightIdx = *(int32_t *)pRight; @@ -88,6 +90,7 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem pCtx[i].param[2].i64 = pQueryInfo->order.order; pCtx[i].param[2].nType = TSDB_DATA_TYPE_BIGINT; pCtx[i].param[1].i64 = pQueryInfo->order.orderColId; + pCtx[i].param[0].i64 = pExpr->base.param[0].i64; // top/bot parameter } else if (functionId == TSDB_FUNC_APERCT) { pCtx[i].param[0].i64 = pExpr->base.param[0].i64; pCtx[i].param[0].nType = pExpr->base.param[0].nType; @@ -126,17 +129,17 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem } } -static void setCtxInputOutputBuffer(SQueryInfo* pQueryInfo, SQLFunctionCtx *pCtx, SLocalMerger *pReducer, - tOrderDescriptor *pDesc) { +static UNUSED_FUNC void setCtxInputOutputBuffer(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx, SLocalMerger *pMerger, + tOrderDescriptor *pDesc) { size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); - pCtx[i].pOutput = pReducer->pResultBuf->data + pExpr->base.offset * pReducer->resColModel->capacity; + pCtx[i].pOutput = pMerger->pResultBuf->data + pExpr->base.offset * pMerger->resColModel->capacity; // input buffer hold only one point data int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); - pCtx[i].pInput = pReducer->pTempBuffer->data + offset; + pCtx[i].pInput = pMerger->pTempBuffer->data + offset; int32_t functionId = pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { @@ -227,8 +230,8 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde size_t size = sizeof(SLocalMerger) + POINTER_BYTES * numOfFlush; - SLocalMerger *pReducer = (SLocalMerger *) calloc(1, size); - if (pReducer == NULL) { + SLocalMerger *pMerger = (SLocalMerger *) calloc(1, size); + if (pMerger == NULL) { tscError("%p failed to create local merge structure, out of memory", pSql); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); @@ -236,15 +239,15 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde return; } - pReducer->pExtMemBuffer = pMemBuffer; - pReducer->pLocalDataSrc = (SLocalDataSource **)&pReducer[1]; - assert(pReducer->pLocalDataSrc != NULL); + pMerger->pExtMemBuffer = pMemBuffer; + pMerger->pLocalDataSrc = (SLocalDataSource **)&pMerger[1]; + assert(pMerger->pLocalDataSrc != NULL); - pReducer->numOfBuffer = numOfFlush; - pReducer->numOfVnode = numOfBuffer; + pMerger->numOfBuffer = numOfFlush; + pMerger->numOfVnode = numOfBuffer; - pReducer->pDesc = pDesc; - tscDebug("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer); + pMerger->pDesc = pDesc; + tscDebug("%p the number of merged leaves is: %d", pSql, pMerger->numOfBuffer); int32_t idx = 0; for (int32_t i = 0; i < numOfBuffer; ++i) { @@ -255,11 +258,11 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde if (ds == NULL) { tscError("%p failed to create merge structure", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tfree(pReducer); + tfree(pMerger); return; } - pReducer->pLocalDataSrc[idx] = ds; + pMerger->pLocalDataSrc[idx] = ds; ds->pMemBuffer = pMemBuffer[i]; ds->flushoutIdx = j; @@ -292,84 +295,84 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde // no data actually, no need to merge result. if (idx == 0) { - tfree(pReducer); + tfree(pMerger); return; } - pReducer->numOfBuffer = idx; + pMerger->numOfBuffer = idx; SCompareParam *param = malloc(sizeof(SCompareParam)); if (param == NULL) { - tfree(pReducer); + tfree(pMerger); return; } - param->pLocalData = pReducer->pLocalDataSrc; - param->pDesc = pReducer->pDesc; - param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; + param->pLocalData = pMerger->pLocalDataSrc; + param->pDesc = pMerger->pDesc; + param->num = pMerger->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); param->groupOrderType = pQueryInfo->groupbyExpr.orderType; - pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); + pMerger->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); - pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator); - if (pReducer->pLoserTree == NULL || pRes->code != 0) { + pRes->code = tLoserTreeCreate(&pMerger->pLoserTree, pMerger->numOfBuffer, param, treeComparator); + if (pMerger->pLoserTree == NULL || pRes->code != 0) { tfree(param); - tfree(pReducer); + tfree(pMerger); return; } // the input data format follows the old format, but output in a new format. // so, all the input must be parsed as old format - pReducer->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx)); - pReducer->rowSize = pMemBuffer[0]->nElemSize; + pMerger->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx)); + pMerger->rowSize = pMemBuffer[0]->nElemSize; tscRestoreFuncForSTableQuery(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo); - if (pReducer->rowSize > pMemBuffer[0]->pageSize) { + if (pMerger->rowSize > pMemBuffer[0]->pageSize) { assert(false); // todo fixed row size is larger than the minimum page size; } - pReducer->hasPrevRow = false; - pReducer->hasUnprocessedRow = false; + pMerger->hasPrevRow = false; + pMerger->hasUnprocessedRow = false; - pReducer->prevRowOfInput = (char *)calloc(1, pReducer->rowSize); + pMerger->prevRowOfInput = (char *)calloc(1, pMerger->rowSize); // used to keep the latest input row - pReducer->pTempBuffer = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage)); - pReducer->discardData = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage)); - pReducer->discard = false; + pMerger->pTempBuffer = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage)); + pMerger->discardData = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage)); + pMerger->discard = false; - pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16; - pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage)); + pMerger->nResultBufSize = pMemBuffer[0]->pageSize * 16; + pMerger->pResultBuf = (tFilePage *)calloc(1, pMerger->nResultBufSize + sizeof(tFilePage)); - pReducer->resColModel = finalmodel; - pReducer->resColModel->capacity = pReducer->nResultBufSize; - pReducer->finalModel = pFFModel; + pMerger->resColModel = finalmodel; + pMerger->resColModel->capacity = pMerger->nResultBufSize; + pMerger->finalModel = pFFModel; if (finalmodel->rowSize > 0) { - pReducer->resColModel->capacity /= finalmodel->rowSize; + pMerger->resColModel->capacity /= finalmodel->rowSize; } - assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize); - pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); + assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pMerger->rowSize); + pMerger->pFinalRes = calloc(1, pMerger->rowSize * pMerger->resColModel->capacity); - if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL || - pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) { - tfree(pReducer->pTempBuffer); - tfree(pReducer->discardData); - tfree(pReducer->pResultBuf); - tfree(pReducer->pFinalRes); - tfree(pReducer->prevRowOfInput); - tfree(pReducer->pLoserTree); + if (pMerger->pTempBuffer == NULL || pMerger->discardData == NULL || pMerger->pResultBuf == NULL || + pMerger->pFinalRes == NULL || pMerger->prevRowOfInput == NULL) { + tfree(pMerger->pTempBuffer); + tfree(pMerger->discardData); + tfree(pMerger->pResultBuf); + tfree(pMerger->pFinalRes); + tfree(pMerger->prevRowOfInput); + tfree(pMerger->pLoserTree); tfree(param); - tfree(pReducer); + tfree(pMerger); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; return; } - pReducer->pTempBuffer->num = 0; + pMerger->pTempBuffer->num = 0; tscCreateResPointerInfo(pRes, pQueryInfo); @@ -378,13 +381,23 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pschema[i] = pDesc->pColumnModel->pFields[i].field; } - tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx, pschema); - setCtxInputOutputBuffer(pQueryInfo, pReducer->pCtx, pReducer, pDesc); + tsCreateSQLFunctionCtx(pQueryInfo, pMerger->pCtx, pschema); +// setCtxInputOutputBuffer(pQueryInfo, pMerger->pCtx, pMerger, pDesc); tfree(pschema); + int32_t maxBufSize = 0; + for (int32_t k = 0; k < tscSqlExprNumOfExprs(pQueryInfo); ++k) { + SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, k); + if (maxBufSize < pExpr->base.resBytes && pExpr->base.functionId == TSDB_FUNC_TAG) { + maxBufSize = pExpr->base.resBytes; + } + } + + pMerger->tagBuf = calloc(1, maxBufSize); + // we change the capacity of schema to denote that there is only one row in temp buffer - pReducer->pDesc->pColumnModel->capacity = 1; + pMerger->pDesc->pColumnModel->capacity = 1; // restore the limitation value at the last stage if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { @@ -392,9 +405,9 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pQueryInfo->limit.offset = pQueryInfo->prjOffset; } - pReducer->offset = (int32_t)pQueryInfo->limit.offset; + pMerger->offset = (int32_t)pQueryInfo->limit.offset; - pRes->pLocalMerger = pReducer; + pRes->pLocalMerger = pMerger; pRes->numOfGroups = 0; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); @@ -405,9 +418,10 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde if (pQueryInfo->fillType != TSDB_FILL_NONE) { SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); - pReducer->pFillInfo = taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, - 4096, (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit, - tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); + pMerger->pFillInfo = + taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, 4096, + (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, + pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); } } @@ -626,12 +640,12 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } } -bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pReducer, char *pPrev, tFilePage *tmpBuffer) { +bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pMerger, char *pPrev, tFilePage *tmpBuffer) { SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); // disable merge procedure for column projection query - int16_t functionId = pReducer->pCtx[0].functionId; - if (pReducer->orderPrjOnSTable) { + int16_t functionId = pMerger->pCtx[0].functionId; + if (pMerger->orderPrjOnSTable) { return true; } @@ -639,7 +653,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pReducer, char *pPrev, tFilePage * return false; } - tOrderDescriptor *pOrderDesc = pReducer->pDesc; + tOrderDescriptor *pOrderDesc = pMerger->pDesc; SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo; // no group by columns, all data belongs to one group @@ -1065,12 +1079,9 @@ static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { pLocalMerge->hasPrevRow = true; } -static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool needInit) { +static void doExecuteFinalMerge( SLocalMerger *pLocalMerge, int32_t numOfExpr, bool needInit) { // the tag columns need to be set before all functions execution - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - for (int32_t j = 0; j < size; ++j) { + for (int32_t j = 0; j < numOfExpr; ++j) { SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[j]; // tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer @@ -1087,8 +1098,8 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n } } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { - SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, j); - pCtx->param[0].i64 = pExpr->base.param[0].i64; +// SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, j); // TODO this data is from +// pCtx->param[0].i64 = pExpr->base.param[0].i64; } pCtx->currentStage = MERGE_STAGE; @@ -1098,7 +1109,7 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n } } - for (int32_t j = 0; j < size; ++j) { + for (int32_t j = 0; j < numOfExpr; ++j) { int32_t functionId = pLocalMerge->pCtx[j].functionId; if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; @@ -1108,19 +1119,79 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n } } +static void savePrevOrderColumns(SMultiwayMergeInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) { + int32_t size = taosArrayGetSize(pInfo->orderColumnList); + for(int32_t i = 0; i < size; ++i) { + int32_t index = *(int16_t*)taosArrayGet(pInfo->orderColumnList, i); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index); + + memcpy(pInfo->prevRow[i], pColInfo->pData + pColInfo->info.bytes * rowIndex, pColInfo->info.bytes); + } +} + +static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, SSDataBlock* pBlock, bool needInit) { + SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; + + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + if (pInfo->hasPrev) { + if (needToMergeRv(pBlock, pInfo->pMerge, i, pInfo->prevRow)) { + for (int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + aAggs[functionId].mergeFunc(&pCtx[j]); + } + } else { + // todo finalize the result + + for (int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + pCtx[j].pOutput += pCtx[j].outputBytes; + pCtx[j].pInput += pCtx[j].inputBytes; + + aAggs[functionId].mergeFunc(&pCtx[j]); + } + + pInfo->binfo.pRes->info.rows += 1; + } + } else { + for (int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + aAggs[functionId].mergeFunc(&pCtx[j]); + } + } + + savePrevOrderColumns(pInfo, pBlock, i); + pInfo->hasPrev = true; + } +} + static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { if (pLocalMerge->hasUnprocessedRow) { pLocalMerge->hasUnprocessedRow = false; - doExecuteFinalMerge(pCmd, pLocalMerge, true); + + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); + size_t size = tscSqlExprNumOfExprs(pQueryInfo); + + doExecuteFinalMerge(pLocalMerge, size, true); savePreviousRow(pLocalMerge, tmpBuffer); } } -static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) { +static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) { int64_t maxOutput = 0; - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - for (int32_t j = 0; j < size; ++j) { + for (int32_t j = 0; j < numOfExprs; ++j) { /* * ts, tag, tagprj function can not decide the output number of current query * the number of output result is decided by main output @@ -1145,53 +1216,36 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) * filled with the same result, which is the tags, specified in group by clause * */ -static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLocalMerger *pLocalMerge) { - int32_t maxBufSize = 0; // find the max tags column length to prepare the buffer - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - - for (int32_t k = 0; k < size; ++k) { - SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, k); - if (maxBufSize < pExpr->base.resBytes && pExpr->base.functionId == TSDB_FUNC_TAG) { - maxBufSize = pExpr->base.resBytes; - } - } - - assert(maxBufSize >= 0); - - char *buf = malloc((size_t)maxBufSize); - for (int32_t k = 0; k < size; ++k) { +static void fillMultiRowsOfTagsVal(SLocalMerger *pLocalMerge, int32_t numOfRes, int32_t numOfExprs) { + for (int32_t k = 0; k < numOfExprs; ++k) { SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k]; if (pCtx->functionId != TSDB_FUNC_TAG) { continue; } int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result - memset(buf, 0, (size_t)maxBufSize); - memcpy(buf, pCtx->pOutput, (size_t)pCtx->outputBytes); + memset(pLocalMerge->tagBuf, 0, (size_t)pLocalMerge->tagBufLen); + memcpy(pLocalMerge->tagBuf, pCtx->pOutput, (size_t)pCtx->outputBytes); for (int32_t i = 0; i < inc; ++i) { pCtx->pOutput += pCtx->outputBytes; - memcpy(pCtx->pOutput, buf, (size_t)pCtx->outputBytes); + memcpy(pCtx->pOutput, pLocalMerge->tagBuf, (size_t)pCtx->outputBytes); } } - - free(buf); } -int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) { - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - - for (int32_t k = 0; k < size; ++k) { +int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) { + for (int32_t k = 0; k < numOfExprs; ++k) { SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k]; aAggs[pCtx->functionId].xFinalize(pCtx); } pLocalMerge->hasPrevRow = false; - int32_t numOfRes = (int32_t)getNumOfResultLocal(pQueryInfo, pLocalMerge->pCtx); + int32_t numOfRes = (int32_t)getNumOfResultLocal(pLocalMerge->pCtx, numOfExprs); pLocalMerge->pResultBuf->num += numOfRes; - fillMultiRowsOfTagsVal(pQueryInfo, numOfRes, pLocalMerge); + fillMultiRowsOfTagsVal(pLocalMerge, numOfRes, numOfExprs); return numOfRes; } @@ -1226,6 +1280,22 @@ bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *t return (ret == 0); } +bool needToMergeRv(SSDataBlock* pBlock, SLocalMerger *pLocalMerge, int32_t index, char **buf) { + int32_t ret = 0; + tOrderDescriptor *pDesc = pLocalMerge->pDesc; + if (pDesc->orderInfo.numOfCols > 0) { + if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc + // todo refactor comparator + ret = compare_aRv(pBlock, pDesc->orderInfo.colIndex, pDesc->orderInfo.numOfCols, index, buf, TSDB_ORDER_ASC); + } else { // desc +// ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); + } + } + + /* if ret == 0, means the result belongs to the same group */ + return (ret == 0); +} + static bool reachGroupResultLimit(SQueryInfo *pQueryInfo, SSqlRes *pRes) { return (pRes->numOfGroups >= pQueryInfo->slimit.limit && pQueryInfo->slimit.limit >= 0); } @@ -1310,6 +1380,24 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren return true; } +bool genFinalResultsRv(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurrentGroupRes) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); + tFilePage * pResBuf = pLocalMerge->pResultBuf; + SColumnModel *pModel = pLocalMerge->resColModel; + + pRes->code = TSDB_CODE_SUCCESS; + + tColModelCompact(pModel, pResBuf, pModel->capacity); + + // no interval query, no fill operation + genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo); + + return true; +} + void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning size_t t = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < t; ++i) { @@ -1437,7 +1525,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { } } - doExecuteFinalMerge(pCmd, pLocalMerge, true); + doExecuteFinalMerge(pLocalMerge, size, true); } int32_t tscDoLocalMerge(SSqlObj *pSql) { @@ -1445,8 +1533,9 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; tscResetForNextRetrieve(pRes); + assert(pSql->signature == pSql); - if (pSql->signature != pSql || pRes == NULL || pRes->pLocalMerger == NULL) { // all data has been processed + if (pRes->pLocalMerger == NULL) { // all data has been processed if (pRes->code == TSDB_CODE_SUCCESS) { return pRes->code; } @@ -1459,6 +1548,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); tFilePage *tmpBuffer = pLocalMerge->pTempBuffer; + int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); if (doHandleLastRemainData(pSql)) { return TSDB_CODE_SUCCESS; } @@ -1481,6 +1571,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { #ifdef _DEBUG_VIEW printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index); #endif + assert((pTree->pNode[0].index < pLocalMerge->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0); // chosen from loser tree @@ -1529,7 +1620,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { if (pLocalMerge->hasPrevRow) { if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) { // belong to the group of the previous row, continue process it - doExecuteFinalMerge(pCmd, pLocalMerge, false); + doExecuteFinalMerge(pLocalMerge, numOfExprs, false); // copy to buffer savePreviousRow(pLocalMerge, tmpBuffer); @@ -1538,7 +1629,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { * current row does not belong to the group of previous row. * so the processing of previous group is completed. */ - int32_t numOfRes = finalizeRes(pQueryInfo, pLocalMerge); + int32_t numOfRes = finalizeRes(pLocalMerge, numOfExprs); bool sameGroup = isSameGroup(pCmd, pLocalMerge, pLocalMerge->prevRowOfInput, tmpBuffer); tFilePage *pResBuf = pLocalMerge->pResultBuf; @@ -1601,7 +1692,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { } } } else { - doExecuteFinalMerge(pCmd, pLocalMerge, true); + doExecuteFinalMerge(pLocalMerge, numOfExprs,true); savePreviousRow(pLocalMerge, tmpBuffer); // copy the processed row to buffer } @@ -1610,7 +1701,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { } if (pLocalMerge->hasPrevRow) { - finalizeRes(pQueryInfo, pLocalMerge); + finalizeRes(pLocalMerge, numOfExprs); } if (pLocalMerge->pResultBuf->num) { @@ -1686,3 +1777,126 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_ return offset; } + +#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ + (data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes) + + +static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel *pModel, int32_t rowIndex, + int32_t maxRows) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + char* p = pColInfo->pData + pBlock->info.rows * pColInfo->info.bytes; + +// char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->num, col); + char *src = COLMODEL_GET_VAL(buf, pModel, maxRows, rowIndex, i); +// char* src = buf + rowIndex * pColInfo->info.bytes; + memmove(p, src, pColInfo->info.bytes); + } + + pBlock->info.rows += 1; +} + +static SSDataBlock* doMultiwaySort(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SMultiwayMergeInfo *pInfo = pOperator->info; + + SLocalMerger *pMerger = pInfo->pMerge; + SLoserTreeInfo *pTree = pMerger->pLoserTree; + SColumnModel *pModel = pMerger->pDesc->pColumnModel; + tFilePage *tmpBuffer = pMerger->pTempBuffer; + + pInfo->binfo.pRes->info.rows = 0; + + while(1) { + if (isAllSourcesCompleted(pMerger)) { + break; + } + +#ifdef _DEBUG_VIEW + printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index); +#endif + + assert((pTree->pNode[0].index < pMerger->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0); + + // chosen from loser tree + SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; + appendOneRowToDataBlock(pInfo->binfo.pRes, pOneDataSrc->filePage.data, pModel, pOneDataSrc->rowIdx, pOneDataSrc->pMemBuffer->pColumnModel->capacity); + +#if defined(_DEBUG_VIEW) + printf("chosen row:\t"); + SSrcColumnInfo colInfo[256] = {0}; + tscGetSrcColumnInfo(colInfo, pQueryInfo); + + tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->num, pModel->capacity, colInfo); +#endif + + pOneDataSrc->rowIdx += 1; + adjustLoserTreeFromNewData(pMerger, pOneDataSrc, pTree); + + if (pInfo->binfo.pRes->info.rows >= 4096) { // TODO threshold + return pInfo->binfo.pRes; + } + } + + return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL; +} + +SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, + int32_t numOfRows, void *merger) { + SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); + + pInfo->pMerge = merger; + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "MultiwaySortOperator"; + pOperator->operatorType = OP_MultiwaySort; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->exec = doMultiwaySort; + + return pOperator; +} + +SSDataBlock* doGlobalAggregate(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SMultiwayMergeInfo* pAggInfo = pOperator->info; + + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + SOperatorInfo *upstream = pOperator->upstream; + + while(1) { + SSDataBlock* pBlock = upstream->exec(upstream); + if (pBlock == NULL) { + break; + } + + // not belongs to the same group, return the result of current group; + setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pBlock, false); + } + + pOperator->status = OP_EXEC_DONE; + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + +// finalizeQueryResult(pOperator, pAggInfo->binfo.pCtx, &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); + pAggInfo->binfo.pRes->info.rows = getNumOfResult(pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); + + pOperator->status = OP_EXEC_DONE; + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + + return pAggInfo->binfo.pRes; +} + diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 768a68217f1132d1d5b1003dce88f9bf30a91121..a3a8bc878466e53590acd1fd1ebbf3590cf28eb5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1573,12 +1573,40 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { return code; } - pRes->code = tscDoLocalMerge(pSql); + SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); + if (pQueryInfo->pQInfo == NULL) { + STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),}; + tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + + STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN}; + + SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(group, &tableKeyInfo); + taosArrayPush(tableGroupInfo.pGroupList, &group); + + SExprInfo* list = calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SExprInfo)); + for(int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { + SExprInfo* pExprInfo = tscSqlExprGet(pQueryInfo, i); + list[i] = *pExprInfo; + } + + pQueryInfo->pQInfo = createQueryInfoFromQueryNode(pQueryInfo, list, &tableGroupInfo, NULL, NULL, pRes->pLocalMerger, MERGE_STAGE); + } + + uint64_t localQueryId = 0; + SMultiwayMergeInfo* pInfo = (SMultiwayMergeInfo*) pQueryInfo->pQInfo->runtimeEnv.proot->info; + pInfo->pMerge = pRes->pLocalMerger; + + qTableQuery(pQueryInfo->pQInfo, &localQueryId); + SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; + pRes->numOfRows = (p != NULL)? p->info.rows: 0; + + //pRes->code = tscDoLocalMerge(pSql); if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { - SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); tscCreateResPointerInfo(pRes, pQueryInfo); - tscSetResRawPtr(pRes, pQueryInfo); + tscSetResRawPtrRv(pRes, pQueryInfo, p); +// tscSetResRawPtr(pRes, pQueryInfo); } pRes->row = 0; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d8ccceea149391b5b321d0130e3041cf55eb8e2f..1ae5041e780df7eae057d8ee2f656150c9ffd99d 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3467,7 +3467,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { SQueryInfo *pQueryInfo1 = tscGetQueryInfo(&pSql->pSubs[i]->cmd, 0); if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) && - tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) { + tscIsProjectionQuery(pQueryInfo1)) || + (pRes1->numOfRows == 0)) { hasData = false; break; } @@ -3477,7 +3478,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { return hasData; } -void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr) { +void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, + SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) { assert(pQueryInfo != NULL); int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; @@ -3493,7 +3495,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; pRuntimeEnv->pQueryAttr = pQueryAttr; - tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, addr); + tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, NULL); pQueryAttr->tableGroupInfo = *pTableGroupInfo; @@ -3580,15 +3582,17 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST tfree(pExprs); - SArray* pa = createExecOperatorPlan(pQueryAttr); + SArray* pa = NULL; + if (stage == MASTER_SCAN) { + pa = createExecOperatorPlan(pQueryAttr); + } else { + pa = createGlobalMergePlan(pQueryAttr); + } STsBufInfo bufInfo = {0}; SQueryParam param = {.pOperator = pa}; - /*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, ¶m, NULL, 0); - pQInfo->runtimeEnv.proot->upstream = pOperator; - - qTableQuery(pQInfo, NULL); - + /*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, ¶m, NULL, 0, merger); +// pQInfo->runtimeEnv.proot->upstream = pSourceOperator; return pQInfo; _cleanup: diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 29fd7306a3ad61048fc33cab5e6a3de605cdb623..38594f8499b1914960ce99f4d33fcf597e472e77 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -522,6 +522,75 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { } } +void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock) { + assert(pRes->numOfCols > 0); + + int32_t offset = 0; + + for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { + SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); + + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); + + pRes->urow[i] = pColData->pData + offset * pColData->info.bytes; + pRes->length[i] = pInfo->field.bytes; + + offset += pInfo->field.bytes; + + // generated the user-defined column result + if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) { + if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) { + setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows); + } else { + if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) { + assert(pInfo->pExpr->base.param[1].nLen <= pInfo->field.bytes); + + for (int32_t k = 0; k < pRes->numOfRows; ++k) { + char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes; + + memcpy(varDataVal(p), pInfo->pExpr->base.param[1].pz, pInfo->pExpr->base.param[1].nLen); + varDataSetLen(p, pInfo->pExpr->base.param[1].nLen); + } + } else { + for (int32_t k = 0; k < pRes->numOfRows; ++k) { + char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes; + memcpy(p, &pInfo->pExpr->base.param[1].i64, pInfo->field.bytes); + } + } + } + + } else if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR) { + // convert unicode to native code in a temporary buffer extra one byte for terminated symbol + pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows); + + // string terminated char for binary data + memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows); + + char* p = pRes->urow[i]; + for (int32_t k = 0; k < pRes->numOfRows; ++k) { + char* dst = pRes->buffer[i] + k * pInfo->field.bytes; + + if (isNull(p, TSDB_DATA_TYPE_NCHAR)) { + memcpy(dst, p, varDataTLen(p)); + } else if (varDataLen(p) > 0) { + int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst)); + varDataSetLen(dst, length); + + if (length == 0) { + tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p); + } + } else { + varDataSetLen(dst, 0); + } + + p += pInfo->field.bytes; + } + + memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows); + } + } +} + static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* pTableCols) { int32_t numOfCols = taosArrayGetSize(pTableCols); SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo)); @@ -626,8 +695,7 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput); - SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL); - //printf("%p\n", pQInfo); + SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN); SSDataBlock* pres = pQInfo->runtimeEnv.outputBuf; // build result @@ -3250,6 +3318,60 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf return TSDB_CODE_SUCCESS; } +static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo) { + assert(tscIsTwoStageSTableQuery(pQueryInfo, 0)); + + pQueryAttr->numOfExpr3 = tscNumOfFields(pQueryInfo); + pQueryAttr->pExpr3 = calloc(pQueryAttr->numOfExpr3, sizeof(SExprInfo)); + if (pQueryAttr->pExpr3 == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < pQueryAttr->numOfExpr3; ++i) { + SExprInfo* pExpr = &pQueryAttr->pExpr1[i]; + SSqlExpr* pse = &pQueryAttr->pExpr3[i].base; + + *pse = pExpr->base; + pse->colInfo.colId = pExpr->base.resColId; + pse->colInfo.colIndex = i; + + pse->colType = pExpr->base.resType; + pse->colBytes = pExpr->base.resBytes; + + for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { + tVariantAssign(&pse->param[j], &pExpr->base.param[j]); + } + } + { + for (int32_t i = 0; i < pQueryAttr->numOfExpr3; ++i) { + SExprInfo* pExpr = &pQueryAttr->pExpr1[i]; + SSqlExpr* pse = &pQueryAttr->pExpr3[i].base; + + // the final result size and type in the same as query on single table. + // so here, set the flag to be false; + int32_t inter = 0; + + int32_t functionId = pExpr->base.functionId; + if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { + continue; + } + + if (functionId == TSDB_FUNC_FIRST_DST) { + functionId = TSDB_FUNC_FIRST; + } else if (functionId == TSDB_FUNC_LAST_DST) { + functionId = TSDB_FUNC_LAST; + } else if (functionId == TSDB_FUNC_STDDEV_DST) { + functionId = TSDB_FUNC_STDDEV; + } + + getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, + &pse->resBytes, &inter, 0, false); + } + } + + return TSDB_CODE_SUCCESS; +} + static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) { if (pTableMetaInfo->tagColList == NULL) { return TSDB_CODE_SUCCESS; @@ -3352,6 +3474,11 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt return code; } + // global aggregate query + if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0) && tscIsTwoStageSTableQuery(pQueryInfo, 0)) { + createGlobalAggregateExpr(pQueryAttr, pQueryInfo); + } + // tag column info code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo); if (code != TSDB_CODE_SUCCESS) { diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index c59067c4b357aa084cd6f31c39f3038075d18a82..10aae17a833dd3804e124146d324e8a28b7f35f1 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -210,9 +210,6 @@ typedef struct SAggFunctionInfo { void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function - // some sql function require scan data twice or more, e.g.,stddev, percentile - void (*xNextStep)(SQLFunctionCtx *pCtx); - // finalizer must be called after all xFunction has been executed to generated final result. void (*xFinalize)(SQLFunctionCtx *pCtx); void (*mergeFunc)(SQLFunctionCtx *pCtx); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 4cc90bce33ffb31557f5149e9a64e50d6f9dc0e4..124ddd06ade45229785aabddd6f155e51781ed90 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -212,9 +212,13 @@ typedef struct SQueryAttr { int32_t maxTableColumnWidth; int32_t tagLen; // tag value length of current query SSqlGroupbyExpr* pGroupbyExpr; + SExprInfo* pExpr1; SExprInfo* pExpr2; int32_t numOfExpr2; + SExprInfo* pExpr3; + int32_t numOfExpr3; + SColumnInfo* colList; SColumnInfo* tagColList; int32_t numOfFilterCols; @@ -290,7 +294,8 @@ enum OPERATOR_TYPE_E { OP_MultiTableAggregate = 14, OP_MultiTableTimeInterval = 15, OP_DummyInput = 16, //TODO remove it after fully refactor. - OP_MultiwayMerge = 17, // multi-way merge process for partial results from different vnodes + OP_MultiwaySort = 17, // multi-way data merge into one input stream. + OP_GlobalAggregate = 18, // global merge for the multi-way data sources. }; typedef struct SOperatorInfo { @@ -412,10 +417,6 @@ typedef struct SLimitOperatorInfo { int64_t total; } SLimitOperatorInfo; -typedef struct SOffsetOperatorInfo { - int64_t offset; -} SOffsetOperatorInfo; - typedef struct SFillOperatorInfo { SFillInfo *pFillInfo; SSDataBlock *pRes; @@ -436,6 +437,17 @@ typedef struct SSWindowOperatorInfo { int32_t start; // start row index } SSWindowOperatorInfo; +struct SLocalMerger; + +typedef struct SMultiwayMergeInfo { + struct SLocalMerger *pMerge; + SOptrBasicInfo binfo; + int64_t seed; + char **prevRow; + bool hasPrev; + SArray *orderColumnList; +} SMultiwayMergeInfo; + SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); @@ -451,7 +463,14 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); - +SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, + int32_t numOfRows, void* merger); +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, int32_t* orderColumn, int32_t numOfOrder); +SSDataBlock* doGlobalAggregate(void* param); +SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); +void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); +int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); +void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); @@ -466,7 +485,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId); int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, - int32_t prevResultLen); + int32_t prevResultLen, void* merger); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index df6e64ddd85c4ec6be693f262ea561ee23f3bf0b..98a7872ee2eea68ef17db4801d9ce629467c3b08 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -237,6 +237,9 @@ int32_t compare_a(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1 int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1, int32_t numOfRow2, int32_t s2, char *data2); +struct SSDataBlock; +int32_t compare_aRv(struct SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order); + #ifdef __cplusplus } #endif diff --git a/src/query/inc/qPlan.h b/src/query/inc/qPlan.h index 911caefe9b829d244975e89c1dfe9569ac0e5a32..8f35565e4bccd4896ec49ddb30f7236ac4b4650c 100644 --- a/src/query/inc/qPlan.h +++ b/src/query/inc/qPlan.h @@ -19,5 +19,6 @@ //TODO refactor SArray* createTableScanPlan(SQueryAttr* pQueryAttr); SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr); +SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr); #endif // TDENGINE_QPLAN_H diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index f4be4016dac4387850fa417e45d00df201c42d36..cc13d64d2b500aea2bb3cfc31d2a84436b134e9e 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -375,12 +375,6 @@ int32_t isValidFunction(const char* name, int32_t len) { return -1; } -// set the query flag to denote that query is completed -static void no_next_step(SQLFunctionCtx *pCtx) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->complete = true; -} - static bool function_setup(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo->initialized) { @@ -1540,7 +1534,7 @@ static void stddev_function_f(SQLFunctionCtx *pCtx, int32_t index) { } } -static void stddev_next_step(SQLFunctionCtx *pCtx) { +static UNUSED_FUNC void stddev_next_step(SQLFunctionCtx *pCtx) { /* * the stddevInfo and the average info struct share the same buffer area * And the position of each element in their struct is exactly the same matched @@ -2907,7 +2901,7 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } -static void percentile_next_step(SQLFunctionCtx *pCtx) { +static UNUSED_FUNC void percentile_next_step(SQLFunctionCtx *pCtx) { SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -4891,7 +4885,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, count_function, count_function_f, - no_next_step, doFinalizer, count_func_merge, countRequired, @@ -4905,7 +4898,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, sum_function, sum_function_f, - no_next_step, function_finalizer, sum_func_merge, statisRequired, @@ -4919,7 +4911,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, avg_function, avg_function_f, - no_next_step, avg_finalizer, avg_func_merge, statisRequired, @@ -4933,7 +4924,6 @@ SAggFunctionInfo aAggs[] = {{ min_func_setup, min_function, min_function_f, - no_next_step, function_finalizer, min_func_merge, statisRequired, @@ -4947,7 +4937,6 @@ SAggFunctionInfo aAggs[] = {{ max_func_setup, max_function, max_function_f, - no_next_step, function_finalizer, max_func_merge, statisRequired, @@ -4961,7 +4950,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, stddev_function, stddev_function_f, - stddev_next_step, stddev_finalizer, noop1, dataBlockRequired, @@ -4975,7 +4963,6 @@ SAggFunctionInfo aAggs[] = {{ percentile_function_setup, percentile_function, percentile_function_f, - percentile_next_step, percentile_finalizer, noop1, dataBlockRequired, @@ -4989,7 +4976,6 @@ SAggFunctionInfo aAggs[] = {{ apercentile_function_setup, apercentile_function, apercentile_function_f, - no_next_step, apercentile_finalizer, apercentile_func_merge, dataBlockRequired, @@ -5003,7 +4989,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, first_function, first_function_f, - no_next_step, function_finalizer, noop1, firstFuncRequired, @@ -5017,7 +5002,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, last_function, last_function_f, - no_next_step, function_finalizer, noop1, lastFuncRequired, @@ -5032,7 +5016,6 @@ SAggFunctionInfo aAggs[] = {{ first_last_function_setup, last_row_function, noop2, - no_next_step, last_row_finalizer, last_dist_func_merge, dataBlockRequired, @@ -5047,7 +5030,6 @@ SAggFunctionInfo aAggs[] = {{ top_bottom_function_setup, top_function, top_function_f, - no_next_step, top_bottom_func_finalizer, top_func_merge, dataBlockRequired, @@ -5062,7 +5044,6 @@ SAggFunctionInfo aAggs[] = {{ top_bottom_function_setup, bottom_function, bottom_function_f, - no_next_step, top_bottom_func_finalizer, bottom_func_merge, dataBlockRequired, @@ -5076,7 +5057,6 @@ SAggFunctionInfo aAggs[] = {{ spread_function_setup, spread_function, spread_function_f, - no_next_step, spread_function_finalizer, spread_func_merge, countRequired, @@ -5090,7 +5070,6 @@ SAggFunctionInfo aAggs[] = {{ twa_function_setup, twa_function, twa_function_f, - no_next_step, twa_function_finalizer, twa_function_copy, dataBlockRequired, @@ -5104,7 +5083,6 @@ SAggFunctionInfo aAggs[] = {{ leastsquares_function_setup, leastsquares_function, leastsquares_function_f, - no_next_step, leastsquares_finalizer, noop1, dataBlockRequired, @@ -5118,7 +5096,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, date_col_output_function, date_col_output_function_f, - no_next_step, doFinalizer, copy_function, noDataRequired, @@ -5132,7 +5109,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, noop1, noop2, - no_next_step, doFinalizer, copy_function, dataBlockRequired, @@ -5146,7 +5122,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, tag_function, noop2, - no_next_step, doFinalizer, copy_function, noDataRequired, @@ -5160,7 +5135,6 @@ SAggFunctionInfo aAggs[] = {{ ts_comp_function_setup, ts_comp_function, ts_comp_function_f, - no_next_step, ts_comp_finalize, copy_function, dataBlockRequired, @@ -5174,7 +5148,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, tag_function, tag_function_f, - no_next_step, doFinalizer, copy_function, noDataRequired, @@ -5188,7 +5161,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, col_project_function, col_project_function_f, - no_next_step, doFinalizer, copy_function, dataBlockRequired, @@ -5202,7 +5174,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, tag_project_function, tag_project_function_f, - no_next_step, doFinalizer, copy_function, noDataRequired, @@ -5216,7 +5187,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, arithmetic_function, arithmetic_function_f, - no_next_step, doFinalizer, copy_function, dataBlockRequired, @@ -5230,7 +5200,6 @@ SAggFunctionInfo aAggs[] = {{ diff_function_setup, diff_function, diff_function_f, - no_next_step, doFinalizer, noop1, dataBlockRequired, @@ -5245,7 +5214,6 @@ SAggFunctionInfo aAggs[] = {{ first_last_function_setup, first_dist_function, first_dist_function_f, - no_next_step, function_finalizer, first_dist_func_merge, firstDistFuncRequired, @@ -5259,7 +5227,6 @@ SAggFunctionInfo aAggs[] = {{ first_last_function_setup, last_dist_function, last_dist_function_f, - no_next_step, function_finalizer, last_dist_func_merge, lastDistFuncRequired, @@ -5273,7 +5240,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, stddev_dst_function, stddev_dst_function_f, - no_next_step, stddev_dst_finalizer, stddev_dst_merge, dataBlockRequired, @@ -5287,7 +5253,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, interp_function, do_sum_f, // todo filter handle - no_next_step, doFinalizer, copy_function, dataBlockRequired, @@ -5301,7 +5266,6 @@ SAggFunctionInfo aAggs[] = {{ rate_function_setup, rate_function, rate_function_f, - no_next_step, rate_finalizer, rate_func_copy, dataBlockRequired, @@ -5315,7 +5279,6 @@ SAggFunctionInfo aAggs[] = {{ rate_function_setup, irate_function, irate_function_f, - no_next_step, rate_finalizer, rate_func_copy, dataBlockRequired, @@ -5329,7 +5292,6 @@ SAggFunctionInfo aAggs[] = {{ rate_function_setup, rate_function, rate_function_f, - no_next_step, sumrate_finalizer, sumrate_func_merge, dataBlockRequired, @@ -5343,7 +5305,6 @@ SAggFunctionInfo aAggs[] = {{ rate_function_setup, irate_function, irate_function_f, - no_next_step, sumrate_finalizer, sumrate_func_merge, dataBlockRequired, @@ -5357,7 +5318,6 @@ SAggFunctionInfo aAggs[] = {{ rate_function_setup, rate_function, rate_function_f, - no_next_step, sumrate_finalizer, sumrate_func_merge, dataBlockRequired, @@ -5371,7 +5331,6 @@ SAggFunctionInfo aAggs[] = {{ rate_function_setup, irate_function, irate_function_f, - no_next_step, sumrate_finalizer, sumrate_func_merge, dataBlockRequired, @@ -5385,7 +5344,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, noop1, noop2, - no_next_step, noop1, noop1, dataBlockRequired, @@ -5399,7 +5357,6 @@ SAggFunctionInfo aAggs[] = {{ function_setup, blockInfo_func, noop2, - no_next_step, blockinfo_func_finalizer, block_func_merge, dataBlockRequired, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3bd398cfa96e86ee6aec2bff742eeeffff4d7527..dc942e632d2c459752f406f7dc82456f6082b61a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -180,7 +180,7 @@ static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowIn int32_t groupIndex); // setup the output buffer for each operator -static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { +SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { const static int32_t minSize = 8; SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); @@ -351,7 +351,6 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes prepareResultListBuffer(pResultRowInfo, pRuntimeEnv); SResultRow *pResult = NULL; - if (p1 == NULL) { pResult = getNewResultRow(pRuntimeEnv->pool); int32_t ret = initResultRow(pResult); @@ -879,7 +878,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pC } } -static void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { +void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { if (pCtx[0].functionId == TSDB_FUNC_ARITHM) { SArithmeticSupport* pSupport = (SArithmeticSupport*) pCtx[0].param[1].pz; if (pSupport->colList == NULL) { @@ -1618,7 +1617,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { return NULL; } -static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator) { +static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator, void* merger) { qDebug("QInfo:%"PRIu64" setup runtime env", GET_QID(pRuntimeEnv)); SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -1728,80 +1727,24 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf break; } - default: { - assert(0); + case OP_MultiwaySort: { + pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, + 4096, merger); // TODO hack it + break; } - } - } - /* - if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query - - } else if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { - if (pQueryAttr->stableQuery) { - pRuntimeEnv->proot = createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, - pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); - } else { - pRuntimeEnv->proot = - createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); - if (pQueryAttr->pExpr2 != NULL) { - pRuntimeEnv->proot = - createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); + case OP_GlobalAggregate: { + pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, + pQueryAttr->numOfExpr3, &pQueryAttr->order.orderColId, 1); + break; } - if (pQueryAttr->fillType != TSDB_FILL_NONE && !pQueryAttr->pointInterpQuery) { - SOperatorInfo* pInfo = pRuntimeEnv->proot; - pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); + default: { + assert(0); } } - - } else if (pQueryAttr->groupbyColumn) { - pRuntimeEnv->proot = - createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); - - if (pQueryAttr->pExpr2 != NULL) { - pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); - } - } else if (pQueryAttr->sw.gap > 0) { - pRuntimeEnv->proot = createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); - - if (pQueryAttr->pExpr2 != NULL) { - pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); - } - } else if (pQueryAttr->simpleAgg) { - if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) { - pRuntimeEnv->proot = - createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - } else { - pRuntimeEnv->proot = - createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - } - - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); - - if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) { - pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); - } - } else { // diff/add/multiply/subtract/division - if (!onlyQueryTags(pQueryAttr)) { - pRuntimeEnv->proot = - createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); - } } - if (pQueryAttr->limit.offset > 0) { - pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); - } - - if (pQueryAttr->limit.limit > 0) { - pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); - } -*/ return TSDB_CODE_SUCCESS; _clean: @@ -3918,7 +3861,8 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in return pFillCol; } -int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t tbScanner, SArray* pOperator) { +int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t tbScanner, SArray* pOperator, + void* param) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; @@ -3981,7 +3925,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts // create runtime environment int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables; pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); - code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator); + code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4355,6 +4299,55 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime return pOptr; } +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, + int32_t* orderColumn, int32_t numOfOrder) { + SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); + + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + int32_t numOfRows = + (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); + + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + + // TODO refactor + int32_t len = 0; + for(int32_t i = 0; i < numOfOutput; ++i) { + len += pExpr[i].base.resBytes; + } + + pInfo->prevRow = taosArrayInit(numOfOrder, (POINTER_BYTES * numOfOrder + len)); + int32_t offset = POINTER_BYTES * numOfOutput; + for(int32_t i = 0; i < numOfOrder; ++i) { + pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; + + int32_t index = orderColumn[i]; + offset += pExpr[index].base.resBytes; + } + + pInfo->orderColumnList = taosArrayFromList(orderColumn, numOfOrder, sizeof(int32_t)); + + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + pInfo->seed = rand(); + setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed); + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "GlobalAggregate"; + pOperator->operatorType = OP_GlobalAggregate; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->upstream = upstream; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; + + pOperator->exec = doGlobalAggregate; + pOperator->cleanup = destroyBasicOperatorInfo; + return pOperator; +} + static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } @@ -5122,6 +5115,25 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn return pOperator; } +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { + SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); + pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + pOperator->name = "SLimitOperator"; + pOperator->operatorType = OP_SLimit; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->upstream = upstream; + pOperator->exec = doLimit; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + + return pOperator; +} + + static SSDataBlock* doTagScan(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -6264,7 +6276,8 @@ bool isValidQInfo(void *param) { return (sig == (uint64_t)pQInfo); } -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t prevResultLen) { +int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, + int32_t prevResultLen, void* merger) { int32_t code = TSDB_CODE_SUCCESS; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; @@ -6309,7 +6322,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryPara } // filter the qualified - if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, param->tableScanOperator, param->pOperator)) != TSDB_CODE_SUCCESS) { + if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index a73f38528266bfa70790414a2963eb0a8290d7b9..79f86b160994220d2dc0baf8d7e488192fba4872 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -20,6 +20,7 @@ #include "taosdef.h" #include "taosmsg.h" #include "tulog.h" +#include "qExecutor.h" #define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ (data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes) @@ -351,6 +352,18 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t } } +static int32_t tsCompareFunc(TSKEY k1, TSKEY k2, int32_t order) { + if (k1 == k2) { + return 0; + } + + if (order == TSDB_ORDER_DESC) { + return (k1 < k2)? 1:-1; + } else { + return (k1 < k2)? -1:1; + } +} + static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) { switch (type) { case TSDB_DATA_TYPE_INT: { @@ -451,6 +464,51 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, return 0; } +int32_t compare_aRv(SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order) { + for (int32_t i = 0; i < numOfCols; ++i) { + int32_t index = colIndex[i]; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index); + + char* data = pColInfo->pData + rowIndex * pColInfo->info.bytes; + if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + int32_t ret = tsCompareFunc(GET_INT64_VAL(data), GET_INT64_VAL(buffer[i]), order); + if (ret == 0) { + continue; // The timestamps are identical + } else { + return ret; + } + } else { + int32_t ret = columnValueAscendingComparator(data, buffer[i], pColInfo->info.type, pColInfo->info.bytes); + if (ret == 0) { + continue; + } else { + return ret; + } + } +// char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx); +// char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx); + +// if (pDescriptor->pColumnModel->pFields[colIdx].field.type == TSDB_DATA_TYPE_TIMESTAMP) { +// int32_t ret = primaryKeyComparator(*(int64_t *)f1, *(int64_t *)f2, colIdx, pDescriptor->tsOrder); +// if (ret == 0) { +// continue; +// } else { +// return ret; +// } +// } else { +// SSchemaEx *pSchema = &pDescriptor->pColumnModel->pFields[colIdx]; +// int32_t ret = columnValueAscendingComparator(f1, f2, pSchema->field.type, pSchema->field.bytes); +// if (ret == 0) { +// continue; +// } else { +// return ret; +// } +// } + } + + return 0; +} + int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, char *data1, int32_t numOfRows2, int32_t s2, char *data2) { assert(numOfRows1 == numOfRows2); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 8ddb2bbd612ed1c68332741264e7ade4a601c7c0..a7c842e343b158fc996056e81d71e06851698381 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -125,4 +125,37 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { return plan; } +SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { + SArray* plan = taosArrayInit(4, sizeof(int32_t)); + + if (!pQueryAttr->stableQuery) { + return plan; + } + + // todo: exchange operator? + int32_t op = OP_MultiwaySort; + taosArrayPush(plan, &op); + + // fill operator + if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) { + op = OP_Fill; + taosArrayPush(plan, &op); + } + + // arithmetic operator + if (!pQueryAttr->simpleAgg && pQueryAttr->interval.interval == 0) { + op = OP_Arithmetic; + taosArrayPush(plan, &op); + } else { + op = OP_GlobalAggregate; + taosArrayPush(plan, &op); + } + + // limit/offset operator + if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { + op = OP_SLimit; + taosArrayPush(plan, &op); + } + return plan; +} diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 64af517e9a1060e774438d92ac733e4dfa1eff1f..d7edd1531474992c9ecfe86545131efd3efa4808 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -172,7 +172,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - code = initQInfo(&pQueryMsg->tsBuf, tsdb, *pQInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen); + code = initQInfo(&pQueryMsg->tsBuf, tsdb, *pQInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL); _over: if (param.pGroupbyExpr != NULL) {