diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index ce67344b03a015419e485aa28e54575c2cf60045..2c7c2f51d02ea1b9943004fc62f8e48f2034d0fe 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -84,9 +84,9 @@ typedef struct SRetrieveSupport { } SRetrieveSupport; int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc, - SColumnModel **pFinalModel, uint32_t nBufferSize); + SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize); -void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, +void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel, int32_t numOfVnodes); int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data, diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 223fb5d226690f480f6d8196180ec61b3f2fbd92..bde27d2932a5bacd09864c76ee81faa6adef04a7 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -282,6 +282,7 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second); bool tscSetSqlOwner(SSqlObj* pSql); void tscClearSqlOwner(SSqlObj* pSql); +int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index b07c7ca66dde0a9b568436f997572829b81cb9fa..bf7791a326d8707b85388e23c1a1d432cec52dd0 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -30,8 +30,6 @@ typedef struct SCompareParam { int32_t groupOrderType; } SCompareParam; -static void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize); - int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { int32_t pLeftIdx = *(int32_t *)pLeft; int32_t pRightIdx = *(int32_t *)pRight; @@ -174,14 +172,14 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd SSqlRes* pRes = &pSql->res; if (pMemBuffer == NULL) { - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); tscError("%p pMemBuffer is NULL", pMemBuffer); pRes->code = TSDB_CODE_TSC_APP_ERROR; return; } if (pDesc->pColumnModel == NULL) { - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); tscError("%p no local buffer or intermediate result format model", pSql); pRes->code = TSDB_CODE_TSC_APP_ERROR; return; @@ -199,7 +197,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd } if (numOfFlush == 0 || numOfBuffer == 0) { - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); tscDebug("%p retrieved no data", pSql); return; } @@ -208,7 +206,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity, pMemBuffer[0]->pageSize); - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); pRes->code = TSDB_CODE_TSC_APP_ERROR; return; } @@ -219,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pReducer == NULL) { tscError("%p failed to create local merge structure, out of memory", pSql); - tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); + tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; return; } @@ -336,6 +334,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pReducer->resColModel = finalmodel; pReducer->resColModel->capacity = pReducer->nResultBufSize; + pReducer->finalModel = pFFModel; + assert(pReducer->finalRowSize > 0); if (pReducer->finalRowSize > 0) { pReducer->resColModel->capacity /= pReducer->finalRowSize; @@ -533,7 +533,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { tfree(pLocalReducer->pFinalRes); tfree(pLocalReducer->discardData); - tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel, + tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel, pLocalReducer->finalModel, pLocalReducer->numOfVnode); for (int32_t i = 0; i < pLocalReducer->numOfBuffer; ++i) { tfree(pLocalReducer->pLocalDataSrc[i]); @@ -657,7 +657,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage } int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc, - SColumnModel **pFinalModel, uint32_t nBufferSizes) { + SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -755,6 +755,18 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr *pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity); + memset(pSchema, 0, sizeof(SSchema) * size); + size = tscNumOfFields(pQueryInfo); + + for(int32_t i = 0; i < size; ++i) { + SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); + pSchema[i].bytes = pField->field.bytes; + pSchema[i].type = pField->field.type; + tstrncpy(pSchema[i].name, pField->field.name, tListLen(pSchema[i].name)); + } + + *pFFModel = createColumnModel(pSchema, (int32_t) size, capacity); + tfree(pSchema); return TSDB_CODE_SUCCESS; } @@ -765,9 +777,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr * @param pFinalModel * @param numOfVnodes */ -void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, +void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel *pFFModel, int32_t numOfVnodes) { destroyColumnModel(pFinalModel); + destroyColumnModel(pFFModel); + tOrderDescDestroy(pDesc); for (int32_t i = 0; i < numOfVnodes; ++i) { @@ -875,10 +889,10 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, if (pQueryInfo->limit.offset > 0) { if (pQueryInfo->limit.offset < pRes->numOfRows) { int32_t prevSize = (int32_t)pBeforeFillData->num; - tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1); + tColModelErase(pLocalReducer->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1); /* remove the hole in column model */ - tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize); + tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize); pRes->numOfRows -= pQueryInfo->limit.offset; pQueryInfo->limit.offset = 0; @@ -900,7 +914,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, pRes->numOfRows -= overflow; pBeforeFillData->num -= overflow; - tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize); + tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize); // set remain data to be discarded, and reset the interpolation information savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); @@ -1242,7 +1256,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur tColModelCompact(pModel, pResBuf, pModel->capacity); if (tscIsSecondStageQuery(pQueryInfo)) { - doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize); + pLocalReducer->finalRowSize = doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize); } #ifdef _DEBUG_VIEW @@ -1612,7 +1626,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) pRes->data = pRes->pLocalReducer->pResultBuf->data; } -void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) { +int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) { char* pbuf = calloc(1, pOutput->num * rowSize); size_t size = tscNumOfFields(pQueryInfo); @@ -1647,8 +1661,10 @@ void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t r } assert(finalRowSize <= rowSize); - memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize); + memcpy(pOutput->data, pbuf, pOutput->num * offset); tfree(pbuf); tfree(arithSup.data); + + return offset; } \ No newline at end of file diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e8b6cb284ee40a3b2200e8d54913dca4ca158309..1b1388238ca7775d1f8ee0127448c6e9d0dd22c2 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -547,7 +547,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo)); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs); + int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2); int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0; @@ -787,8 +787,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr = (SSqlFuncMsg *)pMsg; } - if(tscIsSecondStageQuery(pQueryInfo)) { - size_t output = tscNumOfFields(pQueryInfo); + size_t output = tscNumOfFields(pQueryInfo); + + if ((tscIsSecondStageQuery(pQueryInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || + UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) && (output != tscSqlExprNumOfExprs(pQueryInfo))) { pQueryMsg->secondStageOutput = htonl((int32_t) output); SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index bc522d4007e24a01ed1bf61846f924c912cf3387..028f17800bcdb296b2fe679531c2c1b117df89c7 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1644,6 +1644,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tExtMemBuffer ** pMemoryBuf = NULL; tOrderDescriptor *pDesc = NULL; SColumnModel *pModel = NULL; + SColumnModel *pFinalModel = NULL; pRes->qhandle = 0x1; // hack the qhandle check @@ -1662,7 +1663,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { assert(pState->numOfSub > 0); - int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); + int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize); if (ret != 0) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscQueueAsyncRes(pSql); @@ -1677,7 +1678,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { if (pSql->pSubs == NULL) { tfree(pSql->pSubs); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub); tscQueueAsyncRes(pSql); return ret; @@ -1707,6 +1708,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->subqueryIndex = i; trs->pParentSql = pSql; trs->pFinalColModel = pModel; + trs->pFFColModel = pFinalModel; SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); if (pNew == NULL) { @@ -1730,13 +1732,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscError("%p failed to prepare subquery structure and launch subqueries", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub); doCleanupSubqueries(pSql, i); return pRes->code; // free all allocated resource } if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub); doCleanupSubqueries(pSql, i); return pRes->code; } @@ -1876,7 +1878,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tstrerror(pParentSql->res.code)); // release allocated resource - tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, + tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel, pState->numOfSub); tscFreeRetrieveSup(pSql); @@ -2312,10 +2314,10 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { return; } - int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList); + int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList); - assert(numOfRes * totalSize > 0); - char* tmp = realloc(pRes->pRsp, numOfRes * totalSize); + assert(numOfRes * rowSize > 0); + char* tmp = realloc(pRes->pRsp, numOfRes * rowSize + sizeof(tFilePage)); if (tmp == NULL) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; return; @@ -2323,9 +2325,12 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { pRes->pRsp = tmp; } - pRes->data = pRes->pRsp; + tFilePage* pFilePage = (tFilePage*) pRes->pRsp; + pFilePage->num = numOfRes; + pRes->data = pFilePage->data; char* data = pRes->data; + int16_t bytes = 0; size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); @@ -2352,6 +2357,16 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { pRes->numOfRows = numOfRes; pRes->numOfClauseTotal += numOfRes; + + int32_t finalRowSize = 0; + for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) { + TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); + finalRowSize += pField->bytes; + } + + doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize); + + pRes->data = pFilePage->data; } void tscBuildResFromSubqueries(SSqlObj *pSql) { diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 17642892196d2aa6efa64cc674746b0284424c3a..ca1203cb17da52b8ffe638e29913b06bdfcade29 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -332,7 +332,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset}; point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes}; - point = (SPoint){.key = pFillInfo->start, .val = val1}; + point = (SPoint){.key = pFillInfo->currentKey, .val = val1}; taosGetLinearInterpolationVal(type, &point1, &point2, &point); } } else { diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index a5e71260b487ddd87a6df795c2f5bdfb9d72eca3..6bca0e75c40be80f6aa7ef7f25562fa019fe2020 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -136,9 +136,8 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna #1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 | sql_error select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19; - sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; -if $rows != 100 then +if $rows != 300 then return -1 endi @@ -147,7 +146,7 @@ if $data00 != @70-01-01 08:01:40.990@ then return -1 endi -if $data01 != 30 then +if $data01 != 10 then return -1 endi @@ -168,7 +167,7 @@ if $data05 != 1 then return -1 endi -if $data06 != 2 then +if $data06 != 0 then return -1 endi @@ -177,7 +176,7 @@ if $data10 != @70-01-01 08:01:40.980@ then return -1 endi -if $data11 != 30 then +if $data11 != 10 then return -1 endi @@ -198,7 +197,7 @@ if $data15 != 1 then return -1 endi -if $data16 != 2 then +if $data16 != 0 then return -1 endi