diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 143922bb1fb6d6e10a157de5d90af2da5e221f76..3c0bde000030000330b33212c7e0c942d50d6a90 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -39,39 +39,29 @@ typedef struct SLocalDataSource { } SLocalDataSource; typedef struct SLocalMerger { - SLocalDataSource ** pLocalDataSrc; + SLocalDataSource **pLocalDataSrc; int32_t numOfBuffer; int32_t numOfCompleted; int32_t numOfVnode; - SLoserTreeInfo * pLoserTree; - tFilePage * pResultBuf; - int32_t nResultBufSize; - tFilePage * pTempBuffer; - struct SQLFunctionCtx *pCtx; - int32_t rowSize; // size of each intermediate result. - tOrderDescriptor * pDesc; - SColumnModel * resColModel; - SColumnModel* finalModel; - tExtMemBuffer ** pExtMemBuffer; // disk-based buffer - bool orderPrjOnSTable; // projection query on stable + SLoserTreeInfo *pLoserTree; + int32_t rowSize; // size of each intermediate result. + tOrderDescriptor *pDesc; + tExtMemBuffer **pExtMemBuffer; // disk-based buffer + char *buf; // temp buffer } SLocalMerger; typedef struct SRetrieveSupport { tExtMemBuffer ** pExtMemBuffer; // for build loser tree tOrderDescriptor *pOrderDescriptor; - SColumnModel* pFinalColModel; // colModel for final result - SColumnModel* pFFColModel; int32_t subqueryIndex; // index of current vnode in vnode list SSqlObj * pParentSql; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to uint32_t numOfRetry; // record the number of retry times } SRetrieveSupport; -int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc, - SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize); +int32_t tscLocalReducerEnvCreate(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id); -void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel, - int32_t numOfVnodes); +void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes); int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data, int32_t numOfRows, int32_t orderType); @@ -81,12 +71,10 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF /* * create local reducer to launch the second-stage reduce process at client site */ -void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, - SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql); +int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, + SQueryInfo *pQueryInfo, SLocalMerger **pMerger, int64_t id); -void tscDestroyLocalMerger(SSqlObj *pSql); - -//int32_t tscDoLocalMerge(SSqlObj *pSql); +void tscDestroyLocalMerger(SLocalMerger* pLocalMerger); #ifdef __cplusplus } diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index e4a3ace6b5af128971a5bed21562082602754d97..f1e0196d6811b035c5d558f069b75abbc61a9779 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -14,13 +14,11 @@ */ #include "tscLocalMerge.h" -#include "tscSubquery.h" +//#include "tscSubquery.h" #include "os.h" #include "texpr.h" #include "tlosertree.h" #include "tscLog.h" -#include "tscUtil.h" -#include "tschemautil.h" #include "tsclient.h" #include "qUtil.h" @@ -59,77 +57,25 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { } } -// todo merge with vnode side function -void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema) { - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - - for (int32_t i = 0; i < size; ++i) { - SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); - - pCtx[i].order = pQueryInfo->order.order; - pCtx[i].functionId = pExpr->base.functionId; - - pCtx[i].order = pQueryInfo->order.order; - pCtx[i].functionId = pExpr->base.functionId; - - // input data format comes from pModel - pCtx[i].inputType = pSchema[i].type; - pCtx[i].inputBytes = pSchema[i].bytes; - - pCtx[i].outputBytes = pExpr->base.resBytes; - pCtx[i].outputType = pExpr->base.resType; - - // input buffer hold only one point data - pCtx[i].size = 1; - pCtx[i].hasNull = true; - pCtx[i].currentStage = MERGE_STAGE; - - // for top/bottom function, the output of timestamp is the first column - int32_t functionId = pExpr->base.functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - pCtx[i].ptsOutputBuf = pCtx[0].pOutput; - pCtx[i].param[2].i64 = pQueryInfo->order.order; - pCtx[i].param[2].nType = TSDB_DATA_TYPE_BIGINT; - pCtx[i].param[1].i64 = pQueryInfo->order.orderColId; - pCtx[i].param[0].i64 = pExpr->base.param[0].i64; // top/bot parameter - } else if (functionId == TSDB_FUNC_APERCT) { - pCtx[i].param[0].i64 = pExpr->base.param[0].i64; - pCtx[i].param[0].nType = pExpr->base.param[0].nType; - } else if (functionId == TSDB_FUNC_BLKINFO) { - pCtx[i].param[0].i64 = pExpr->base.param[0].i64; - pCtx[i].param[0].nType = pExpr->base.param[0].nType; - pCtx[i].numOfParams = 1; - } - - pCtx[i].interBufBytes = pExpr->base.interBytes; - pCtx[i].stableQuery = true; - } -} - -void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, - SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj *pSql) { - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - +int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, + SQueryInfo* pQueryInfo, SLocalMerger **pMerger, int64_t id) { if (pMemBuffer == NULL) { - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); - tscError("pMemBuffer:%p is NULL", pMemBuffer); - pRes->code = TSDB_CODE_TSC_APP_ERROR; - return; + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer); + tscError("0x%"PRIx64" %p pMemBuffer is NULL", id, pMemBuffer); + return TSDB_CODE_TSC_APP_ERROR; } if (pDesc->pColumnModel == NULL) { - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); - tscError("0x%"PRIx64" no local buffer or intermediate result format model", pSql->self); - pRes->code = TSDB_CODE_TSC_APP_ERROR; - return; + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer); + tscError("0x%"PRIx64" no local buffer or intermediate result format model", id); + return TSDB_CODE_TSC_APP_ERROR; } int32_t numOfFlush = 0; for (int32_t i = 0; i < numOfBuffer; ++i) { int32_t len = pMemBuffer[i]->fileMeta.flushoutData.nLength; if (len == 0) { - tscDebug("0x%"PRIx64" no data retrieved from orderOfVnode:%d", pSql->self, i + 1); + tscDebug("0x%"PRIx64" no data retrieved from orderOfVnode:%d", id, i + 1); continue; } @@ -137,41 +83,38 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde } if (numOfFlush == 0 || numOfBuffer == 0) { - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); - pCmd->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty - tscDebug("0x%"PRIx64" retrieved no data", pSql->self); - return; + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer); + tscDebug("0x%"PRIx64" no data to retrieve", id); + return TSDB_CODE_SUCCESS; } if (pDesc->pColumnModel->capacity >= pMemBuffer[0]->pageSize) { - tscError("0x%"PRIx64" Invalid value of buffer capacity %d and page size %d ", pSql->self, pDesc->pColumnModel->capacity, + tscError("0x%"PRIx64" Invalid value of buffer capacity %d and page size %d ", id, pDesc->pColumnModel->capacity, pMemBuffer[0]->pageSize); - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); - pRes->code = TSDB_CODE_TSC_APP_ERROR; - return; + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer); + return TSDB_CODE_TSC_APP_ERROR; } size_t size = sizeof(SLocalMerger) + POINTER_BYTES * numOfFlush; - SLocalMerger *pMerger = (SLocalMerger *) calloc(1, size); - if (pMerger == NULL) { - tscError("0x%"PRIx64" failed to create local merge structure, out of memory", pSql->self); + *pMerger = (SLocalMerger *) calloc(1, size); + if ((*pMerger) == NULL) { + tscError("0x%"PRIx64" failed to create local merge structure, out of memory", id); - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - return; + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pMerger->pExtMemBuffer = pMemBuffer; - pMerger->pLocalDataSrc = (SLocalDataSource **)&pMerger[1]; - assert(pMerger->pLocalDataSrc != NULL); + (*pMerger)->pExtMemBuffer = pMemBuffer; + (*pMerger)->pLocalDataSrc = (SLocalDataSource **)&pMerger[1]; + assert((*pMerger)->pLocalDataSrc != NULL); - pMerger->numOfBuffer = numOfFlush; - pMerger->numOfVnode = numOfBuffer; + (*pMerger)->numOfBuffer = numOfFlush; + (*pMerger)->numOfVnode = numOfBuffer; - pMerger->pDesc = pDesc; - tscDebug("0x%"PRIx64" the number of merged leaves is: %d", pSql->self, pMerger->numOfBuffer); + (*pMerger)->pDesc = pDesc; + tscDebug("0x%"PRIx64" the number of merged leaves is: %d", id, (*pMerger)->numOfBuffer); int32_t idx = 0; for (int32_t i = 0; i < numOfBuffer; ++i) { @@ -180,13 +123,12 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde for (int32_t j = 0; j < numOfFlushoutInFile; ++j) { SLocalDataSource *ds = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->pageSize); if (ds == NULL) { - tscError("0x%"PRIx64" failed to create merge structure", pSql->self); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscError("0x%"PRIx64" failed to create merge structure", id); tfree(pMerger); - return; + return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pMerger->pLocalDataSrc[idx] = ds; + (*pMerger)->pLocalDataSrc[idx] = ds; ds->pMemBuffer = pMemBuffer[i]; ds->flushoutIdx = j; @@ -194,7 +136,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ds->pageId = 0; ds->rowIdx = 0; - tscDebug("0x%"PRIx64" load data from disk into memory, orderOfVnode:%d, total:%d", pSql->self, i + 1, idx + 1); + tscDebug("0x%"PRIx64" load data from disk into memory, orderOfVnode:%d, total:%d", id, i + 1, idx + 1); tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0); #ifdef _DEBUG_VIEW printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num); @@ -208,7 +150,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde #endif if (ds->filePage.num == 0) { // no data in this flush, the index does not increase - tscDebug("0x%"PRIx64" flush data is empty, ignore %d flush record", pSql->self, idx); + tscDebug("0x%"PRIx64" flush data is empty, ignore %d flush record", id, idx); tfree(ds); continue; } @@ -219,115 +161,54 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde // no data actually, no need to merge result. if (idx == 0) { - tfree(pMerger); - return; + tscDebug("0x%"PRIx64" retrieved no data", id); + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer); + return TSDB_CODE_SUCCESS; } - pMerger->numOfBuffer = idx; + (*pMerger)->numOfBuffer = idx; SCompareParam *param = malloc(sizeof(SCompareParam)); if (param == NULL) { - tfree(pMerger); - return; + tfree((*pMerger)); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } - param->pLocalData = pMerger->pLocalDataSrc; - param->pDesc = pMerger->pDesc; - param->num = pMerger->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); + param->pLocalData = (*pMerger)->pLocalDataSrc; + param->pDesc = (*pMerger)->pDesc; + param->num = (*pMerger)->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; param->groupOrderType = pQueryInfo->groupbyExpr.orderType; - pMerger->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); - pRes->code = tLoserTreeCreate(&pMerger->pLoserTree, pMerger->numOfBuffer, param, treeComparator); - if (pMerger->pLoserTree == NULL || pRes->code != 0) { + int32_t code = tLoserTreeCreate(&(*pMerger)->pLoserTree, (*pMerger)->numOfBuffer, param, treeComparator); + if ((*pMerger)->pLoserTree == NULL || code != TSDB_CODE_SUCCESS) { tfree(param); - tfree(pMerger); - return; - } - - // the input data format follows the old format, but output in a new format. - // so, all the input must be parsed as old format - pMerger->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx)); - pMerger->rowSize = pMemBuffer[0]->nElemSize; - - tscFieldInfoUpdateOffset(pQueryInfo); - - if (pMerger->rowSize > pMemBuffer[0]->pageSize) { - assert(false); // todo fixed row size is larger than the minimum page size; + tfree((*pMerger)); + return code; } - // used to keep the latest input row - pMerger->pTempBuffer = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage)); - - pMerger->nResultBufSize = pMemBuffer[0]->pageSize * 16; - pMerger->pResultBuf = (tFilePage *)calloc(1, pMerger->nResultBufSize + sizeof(tFilePage)); + (*pMerger)->rowSize = pMemBuffer[0]->nElemSize; - pMerger->resColModel = finalmodel; - pMerger->resColModel->capacity = pMerger->nResultBufSize; - pMerger->finalModel = pFFModel; + // todo fixed row size is larger than the minimum page size; + assert((*pMerger)->rowSize <= pMemBuffer[0]->pageSize); - if (finalmodel->rowSize > 0) { - pMerger->resColModel->capacity /= finalmodel->rowSize; - } - - assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pMerger->rowSize); - - if (pMerger->pTempBuffer == NULL || pMerger->pLoserTree == NULL) { - tfree(pMerger->pTempBuffer); - tfree(pMerger->pLoserTree); + if ((*pMerger)->pLoserTree == NULL) { + tfree((*pMerger)->pLoserTree); tfree(param); - tfree(pMerger); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - return; - } - - pMerger->pTempBuffer->num = 0; - tscCreateResPointerInfo(pRes, pQueryInfo); - - SSchema* pschema = calloc(pDesc->pColumnModel->numOfCols, sizeof(SSchema)); - for(int32_t i = 0; i < pDesc->pColumnModel->numOfCols; ++i) { - pschema[i] = pDesc->pColumnModel->pFields[i].field; - } - - tsCreateSQLFunctionCtx(pQueryInfo, pMerger->pCtx, pschema); -// setCtxInputOutputBuffer(pQueryInfo, pMerger->pCtx, pMerger, pDesc); - - tfree(pschema); - - int32_t maxBufSize = 0; - for (int32_t k = 0; k < tscSqlExprNumOfExprs(pQueryInfo); ++k) { - SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, k); - if (maxBufSize < pExpr->base.resBytes && pExpr->base.functionId == TSDB_FUNC_TAG) { - maxBufSize = pExpr->base.resBytes; - } + tfree((*pMerger)); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } - // we change the capacity of schema to denote that there is only one row in temp buffer - pMerger->pDesc->pColumnModel->capacity = 1; - // restore the limitation value at the last stage if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pQueryInfo->limit.limit = pQueryInfo->clauseLimit; pQueryInfo->limit.offset = pQueryInfo->prjOffset; } - pRes->pLocalMerger = pMerger; - pRes->numOfGroups = 0; + // we change the capacity of schema to denote that there is only one row in temp buffer + (*pMerger)->pDesc->pColumnModel->capacity = 1; -// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); -// STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - -// TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey; -// int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision); - -// if (pQueryInfo->fillType != TSDB_FILL_NONE) { -// SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); -// pMerger->pFillInfo = -// taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, 4096, -// (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, -// pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); -// } + return TSDB_CODE_SUCCESS; } static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, @@ -418,44 +299,29 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa return 0; } -void tscDestroyLocalMerger(SSqlObj *pSql) { - if (pSql == NULL) { - return; - } - - SSqlRes *pRes = &(pSql->res); - if (pRes->pLocalMerger == NULL) { +void tscDestroyLocalMerger(SLocalMerger* pLocalMerger) { + if (pLocalMerger == NULL) { return; } - // there is no more result, so we release all allocated resource - SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL); - tfree(pLocalMerge->pResultBuf); - tfree(pLocalMerge->pCtx); - - if (pLocalMerge->pLoserTree) { - tfree(pLocalMerge->pLoserTree->param); - tfree(pLocalMerge->pLoserTree); + if (pLocalMerger->pLoserTree) { + tfree(pLocalMerger->pLoserTree->param); + tfree(pLocalMerger->pLoserTree); } - tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, - pLocalMerge->finalModel, pLocalMerge->numOfVnode); - for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) { - tfree(pLocalMerge->pLocalDataSrc[i]); + tscLocalReducerEnvDestroy(pLocalMerger->pExtMemBuffer, pLocalMerger->pDesc, pLocalMerger->numOfVnode); + for (int32_t i = 0; i < pLocalMerger->numOfBuffer; ++i) { + tfree(pLocalMerger->pLocalDataSrc[i]); } - pLocalMerge->numOfBuffer = 0; - pLocalMerge->numOfCompleted = 0; - tfree(pLocalMerge->pTempBuffer); - - free(pLocalMerge); - - tscDebug("0x%"PRIx64" free local reducer finished", pSql->self); + pLocalMerger->numOfBuffer = 0; + pLocalMerger->numOfCompleted = 0; + tfree(pLocalMerger->buf); + free(pLocalMerger); } -static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) { - int32_t numOfGroupByCols = 0; - SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); +static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo* pQueryInfo, SColumnModel *pModel) { + int32_t numOfGroupByCols = 0; if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols; @@ -525,32 +391,25 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } } -int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc, - SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SSchema * pSchema = NULL; +int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, + tOrderDescriptor **pOrderDesc, uint32_t nBufferSizes, int64_t id) { + SSchema *pSchema = NULL; SColumnModel *pModel = NULL; - *pFinalModel = NULL; - SQueryInfo * pQueryInfo = tscGetActiveQueryInfo(pCmd); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub); + (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * numOfSub); if (*pMemBuffer == NULL) { - tscError("0x%"PRIx64" failed to allocate memory", pSql->self); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - return pRes->code; + tscError("0x%"PRIx64" failed to allocate memory", id); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } size_t size = tscSqlExprNumOfExprs(pQueryInfo); pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size); if (pSchema == NULL) { - tscError("0x%"PRIx64" failed to allocate memory", pSql->self); - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - return pRes->code; + tscError("0x%"PRIx64" failed to allocate memory", id); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } int32_t rlen = 0; @@ -570,6 +429,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr } pModel = createColumnModel(pSchema, (int32_t)size, capacity); + tfree(pSchema); int32_t pg = DEFAULT_PAGE_SIZE; int32_t overhead = sizeof(tFilePage); @@ -577,95 +437,26 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pg *= 2; } - size_t numOfSubs = pSql->subState.numOfSub; - assert(numOfSubs <= pTableMetaInfo->vgroupList->numOfVgroups); - for (int32_t i = 0; i < numOfSubs; ++i) { + assert(numOfSub <= pTableMetaInfo->vgroupList->numOfVgroups); + for (int32_t i = 0; i < numOfSub; ++i) { (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel); (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; } - if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tfree(pSchema); - return pRes->code; - } - - // final result depends on the fields number - memset(pSchema, 0, sizeof(SSchema) * size); - - for (int32_t i = 0; i < size; ++i) { - SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); - - SSchema p1 = {0}; - if (pExpr->base.colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) { - p1 = *tGetTbnameColumnSchema(); - } else if (TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag)) { - p1.bytes = pExpr->base.resBytes; - p1.type = (uint8_t) pExpr->base.resType; - tstrncpy(p1.name, pExpr->base.aliasName, tListLen(p1.name)); - } else { - p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->base.colInfo.colIndex); - } - - int32_t inter = 0; - int16_t type = -1; - int16_t bytes = 0; - - // the final result size and type in the same as query on single table. - // so here, set the flag to be false; - int32_t functionId = pExpr->base.functionId; - if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { - type = pModel->pFields[i].field.type; - bytes = pModel->pFields[i].field.bytes; - } else { - if (functionId == TSDB_FUNC_FIRST_DST) { - functionId = TSDB_FUNC_FIRST; - } else if (functionId == TSDB_FUNC_LAST_DST) { - functionId = TSDB_FUNC_LAST; - } else if (functionId == TSDB_FUNC_STDDEV_DST) { - functionId = TSDB_FUNC_STDDEV; - } - - int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false); - assert(ret == TSDB_CODE_SUCCESS); - } - - pSchema[i].type = (uint8_t)type; - pSchema[i].bytes = bytes; - strcpy(pSchema[i].name, pModel->pFields[i].field.name); - } - - *pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity); - - memset(pSchema, 0, sizeof(SSchema) * size); - size = tscNumOfFields(pQueryInfo); - - for(int32_t i = 0; i < size; ++i) { - SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); - pSchema[i].bytes = pField->field.bytes; - pSchema[i].type = pField->field.type; - tstrncpy(pSchema[i].name, pField->field.name, tListLen(pSchema[i].name)); + if (createOrderDescriptor(pOrderDesc, pQueryInfo, pModel) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; } - *pFFModel = createColumnModel(pSchema, (int32_t) size, capacity); - - tfree(pSchema); return TSDB_CODE_SUCCESS; } /** * @param pMemBuffer * @param pDesc - * @param pFinalModel * @param numOfVnodes */ -void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel *pFFModel, - int32_t numOfVnodes) { - destroyColumnModel(pFinalModel); - destroyColumnModel(pFFModel); - +void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes) { tOrderDescDestroy(pDesc); - for (int32_t i = 0; i < numOfVnodes; ++i) { pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]); } @@ -877,10 +668,12 @@ static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) { return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted); } -void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) { - SSqlRes *pRes = &pObj->res; +void tscInitResObjForLocalQuery(SSqlObj *pSql, int32_t numOfRes, int32_t rowLen) { + SSqlRes *pRes = &pSql->res; if (pRes->pLocalMerger != NULL) { - tscDestroyLocalMerger(pObj); + tscDestroyLocalMerger(pRes->pLocalMerger); + pRes->pLocalMerger = NULL; + tscDebug("0x%"PRIx64" free local reducer finished", pSql->self); } pRes->qId = 1; // hack to pass the safety check in fetch_row function @@ -891,14 +684,12 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) pRes->pLocalMerger = (SLocalMerger *)calloc(1, sizeof(SLocalMerger)); /* - * we need one additional byte space - * the sprintf function needs one additional space to put '\0' at the end of string + * One more byte space is required, since the sprintf function needs one additional space to put '\0' at + * the end of string */ - size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1; - pRes->pLocalMerger->pResultBuf = (tFilePage *)calloc(1, allocSize); - - pRes->pLocalMerger->pResultBuf->num = numOfRes; - pRes->data = pRes->pLocalMerger->pResultBuf->data; + size_t size = numOfRes * rowLen + 1; + pRes->pLocalMerger->buf = calloc(1, size); + pRes->data = pRes->pLocalMerger->buf; } int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) { @@ -944,8 +735,8 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_ return offset; } -#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ - (data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes) +#define COLMODEL_GET_VAL(data, schema, rowId, colId) \ + (data + (schema)->pFields[colId].offset * ((schema)->capacity) + (rowId) * (schema)->pFields[colId].field.bytes) static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel *pModel, int32_t rowIndex, int32_t maxRows) { @@ -953,7 +744,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); char* p = pColInfo->pData + pBlock->info.rows * pColInfo->info.bytes; - char *src = COLMODEL_GET_VAL(buf, pModel, maxRows, rowIndex, i); + char *src = COLMODEL_GET_VAL(buf, pModel, rowIndex, i); memmove(p, src, pColInfo->info.bytes); } @@ -970,8 +761,6 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SLocalMerger *pMerger = pInfo->pMerge; SLoserTreeInfo *pTree = pMerger->pLoserTree; - SColumnModel *pModel = pMerger->pDesc->pColumnModel; - tFilePage *tmpBuffer = pMerger->pTempBuffer; pInfo->binfo.pRes->info.rows = 0; @@ -984,7 +773,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index); #endif - assert((pTree->pNode[0].index < pMerger->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0); + assert((pTree->pNode[0].index < pMerger->numOfBuffer) && (pTree->pNode[0].index >= 0)); // chosen from loser tree SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; @@ -998,7 +787,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex); char *newRow = - COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pModel, pOneDataSrc->pMemBuffer->pColumnModel->capacity, + COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel, pOneDataSrc->rowIdx, pIndex->colIndex); char * data = pInfo->prevRow[i]; @@ -1021,7 +810,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex); char *curCol = - COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pModel, pOneDataSrc->pMemBuffer->pColumnModel->capacity, + COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel, pOneDataSrc->rowIdx, pIndex->colIndex); memcpy(pInfo->prevRow[i], curCol, pColInfo->info.bytes); } @@ -1033,7 +822,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { return pInfo->binfo.pRes; } - appendOneRowToDataBlock(pInfo->binfo.pRes, pOneDataSrc->filePage.data, pModel, pOneDataSrc->rowIdx, pOneDataSrc->pMemBuffer->pColumnModel->capacity); + appendOneRowToDataBlock(pInfo->binfo.pRes, pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel, + pOneDataSrc->rowIdx, pOneDataSrc->pMemBuffer->pColumnModel->capacity); #if defined(_DEBUG_VIEW) printf("chosen row:\t"); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9528a553b24dc4eb6211bbef71a96013dc86b2a0..215c96784eab76ee18eaf8f6728da0bb5e8134da 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1615,6 +1615,11 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { return code; } + if (pRes->pLocalMerger == NULL) { // no result from subquery, so abort here directly. + (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); + return code; + } + // global aggregation may be the upstream for parent query SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); if (pQueryInfo->pQInfo == NULL) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 67eea432e650f4237fd53106470cbaf91d50e7c1..5e78901d6f6aac6175d40aa8759eb9b6137ae0bd 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2422,8 +2422,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tExtMemBuffer **pMemoryBuf = NULL; tOrderDescriptor *pDesc = NULL; - SColumnModel *pModel = NULL; - SColumnModel *pFinalModel = NULL; pRes->qId = 0x1; // hack the qhandle check @@ -2442,9 +2440,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { assert(pState->numOfSub > 0); - int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize); + int32_t ret = tscLocalReducerEnvCreate(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self); if (ret != 0) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + pRes->code = ret; tscAsyncResultOnError(pSql); tfree(pMemoryBuf); return ret; @@ -2455,7 +2453,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { if (pSql->pSubs == NULL) { tfree(pSql->pSubs); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc,pState->numOfSub); tscAsyncResultOnError(pSql); return ret; @@ -2498,8 +2496,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->subqueryIndex = i; trs->pParentSql = pSql; - trs->pFinalColModel = pModel; - trs->pFFColModel = pFinalModel; SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); if (pNew == NULL) { @@ -2524,13 +2520,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pState->numOfSub); doCleanupSubqueries(pSql, i); return pRes->code; // free all allocated resource } if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pState->numOfSub); doCleanupSubqueries(pSql, i); return pRes->code; } @@ -2695,7 +2691,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tstrerror(pParentSql->res.code)); // release allocated resource - tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel, + tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub); tscFreeRetrieveSup(pSql); @@ -2770,19 +2766,25 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd, 0); tscClearInterpInfo(pPQueryInfo); - tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql); + code = tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, pPQueryInfo, &pParentSql->res.pLocalMerger, pParentSql->self); + pParentSql->res.code = code; + + if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) { + pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty + } else { + pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + } + + tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo); + tscDebug("0x%"PRIx64" build loser tree completed", pParentSql->self); pParentSql->res.precision = pSql->res.precision; pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - - tscFreeRetrieveSup(pSql); + pParentSql->res.numOfGroups = 0; - // set the command flag must be after the semaphore been correctly set. - if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) { - pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; - } + tscFreeRetrieveSup(pSql); if (pParentSql->res.code == TSDB_CODE_SUCCESS) { (*pParentSql->fp)(pParentSql->param, pParentSql, 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 420b78f64dccfa6bac9302dc2c04c2ccd9ce44e5..95acce3d771a316f2b5ba4c3a77ad4b613eb446a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -828,11 +828,10 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { } void tscFreeSqlResult(SSqlObj* pSql) { - tscDestroyLocalMerger(pSql); - SSqlRes* pRes = &pSql->res; - tscDestroyResPointerInfo(pRes); + tscDestroyLocalMerger(pRes->pLocalMerger); + tscDestroyResPointerInfo(pRes); memset(&pSql->res, 0, sizeof(SSqlRes)); } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index b28bdbf130aaef8b3e3ffc8cda0653e46d0efc29..d7d3da66d076cdf09ef6d5efd9b0b60fbc6d0c22 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -354,7 +354,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // kill current query and free corresponding resources. if (pRetrieve->free == 1) { - vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qId, *handle); + vWarn("vgId:%d, QInfo:%"PRIx64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qId, *handle); qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); diff --git a/tests/examples/c/CMakeLists.txt b/tests/examples/c/CMakeLists.txt index 954fe468b1b8fc88ce93fa2474e2f69a33415e6f..7f941b8c87a2c256a82aa37f9e937a41cd9a4c77 100644 --- a/tests/examples/c/CMakeLists.txt +++ b/tests/examples/c/CMakeLists.txt @@ -5,6 +5,8 @@ IF (TD_LINUX) AUX_SOURCE_DIRECTORY(. SRC) ADD_EXECUTABLE(demo apitest.c) TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread ) + ADD_EXECUTABLE(subscribe subscribe.c) + TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread ) ADD_EXECUTABLE(epoll epoll.c) TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread ) ENDIF ()