提交 de57e2e5 编写于 作者: H Haojun Liao

[td-2859]refactor.

上级 77dfb9fe
......@@ -1120,13 +1120,16 @@ static void doExecuteFinalMerge( SLocalMerger *pLocalMerge, int32_t numOfExpr, b
}
static void savePrevOrderColumns(SMultiwayMergeInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) {
int32_t size = taosArrayGetSize(pInfo->orderColumnList);
int32_t size = pInfo->pMerge->pDesc->orderInfo.numOfCols;
for(int32_t i = 0; i < size; ++i) {
int32_t index = *(int16_t*)taosArrayGet(pInfo->orderColumnList, i);
int32_t index = pInfo->pMerge->pDesc->orderInfo.colIndex[i];
// int32_t index = *(int16_t*)taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index);
memcpy(pInfo->prevRow[i], pColInfo->pData + pColInfo->info.bytes * rowIndex, pColInfo->info.bytes);
}
pInfo->hasPrev = true;
}
static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, SSDataBlock* pBlock, bool needInit) {
......@@ -1141,24 +1144,37 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr,
continue;
}
pCtx[j].size = 1;
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} else {
// todo finalize the result
for (int32_t j = 0; j < numOfExpr; ++j) {
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
pCtx[j].size = 1;
aAggs[functionId].xFinalize(&pCtx[j]);
}
pInfo->binfo.pRes->info.rows += 1;
for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += pCtx[j].outputBytes;
pCtx[j].pInput += pCtx[j].inputBytes;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
pCtx[j].size = 1;
aAggs[functionId].mergeFunc(&pCtx[j]);
}
pInfo->binfo.pRes->info.rows += 1;
}
} else {
for (int32_t j = 0; j < numOfExpr; ++j) {
......@@ -1167,13 +1183,24 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr,
continue;
}
pCtx[j].size = 1;
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
savePrevOrderColumns(pInfo, pBlock, i);
pInfo->hasPrev = true;
}
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
aAggs[functionId].xFinalize(&pCtx[j]);
}
pInfo->binfo.pRes->info.rows += 1;
}
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
......@@ -1891,12 +1918,6 @@ SSDataBlock* doGlobalAggregate(void* param) {
pOperator->status = OP_EXEC_DONE;
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
// finalizeQueryResult(pOperator, pAggInfo->binfo.pCtx, &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset);
pAggInfo->binfo.pRes->info.rows = getNumOfResult(pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
pOperator->status = OP_EXEC_DONE;
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
return pAggInfo->binfo.pRes;
}
......@@ -525,17 +525,17 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock) {
assert(pRes->numOfCols > 0);
int32_t offset = 0;
// int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
pRes->urow[i] = pColData->pData + offset * pColData->info.bytes;
pRes->urow[i] = pColData->pData/* + offset * pColData->info.bytes*/;
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
//offset += pInfo->field.bytes;
// generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
......@@ -3337,6 +3337,7 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes;
pse->colInfo.flag = TSDB_COL_NORMAL;
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) {
tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
......
......@@ -1734,8 +1734,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
case OP_GlobalAggregate: {
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, &pQueryAttr->order.orderColId, 1);
pRuntimeEnv->proot =
createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, &pQueryAttr->order.orderColId, 1);
break;
}
......@@ -3861,8 +3862,8 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
return pFillCol;
}
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t tbScanner, SArray* pOperator,
void* param) {
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* tsdb, int32_t tbScanner,
SArray* pOperator, void* param) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
......@@ -4299,13 +4300,15 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
return pOptr;
}
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput,
int32_t* orderColumn, int32_t numOfOrder) {
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput, int32_t* orderColumn,
int32_t numOfOrder) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows =
(int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
// SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows = 4096;
// int32_t numOfRows =
// (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
......@@ -4322,7 +4325,9 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
int32_t index = orderColumn[i];
offset += pExpr[index].base.resBytes;
if (index != INT32_MIN) {
offset += pExpr[index].base.resBytes;
}
}
pInfo->orderColumnList = taosArrayFromList(orderColumn, numOfOrder, sizeof(int32_t));
......
......@@ -485,25 +485,6 @@ int32_t compare_aRv(SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, i
return ret;
}
}
// char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
// char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
// if (pDescriptor->pColumnModel->pFields[colIdx].field.type == TSDB_DATA_TYPE_TIMESTAMP) {
// int32_t ret = primaryKeyComparator(*(int64_t *)f1, *(int64_t *)f2, colIdx, pDescriptor->tsOrder);
// if (ret == 0) {
// continue;
// } else {
// return ret;
// }
// } else {
// SSchemaEx *pSchema = &pDescriptor->pColumnModel->pFields[colIdx];
// int32_t ret = columnValueAscendingComparator(f1, f2, pSchema->field.type, pSchema->field.bytes);
// if (ret == 0) {
// continue;
// } else {
// return ret;
// }
// }
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册