提交 93c7e07d 编写于 作者: H Haojun Liao

[td-225]

上级 de57e2e5
......@@ -327,7 +327,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pMerger->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx));
pMerger->rowSize = pMemBuffer[0]->nElemSize;
tscRestoreFuncForSTableQuery(pQueryInfo);
// tscRestoreFuncForSTableQuery(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
if (pMerger->rowSize > pMemBuffer[0]->pageSize) {
......@@ -1162,7 +1162,9 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr,
for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += pCtx[j].outputBytes;
pCtx[j].pInput += pCtx[j].inputBytes;
pCtx[j].pInput += pCtx[j].inputBytes;
aAggs[pCtx[j].functionId].init(&pCtx[j]);
}
for (int32_t j = 0; j < numOfExpr; ++j) {
......@@ -1174,7 +1176,6 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr,
pCtx[j].size = 1;
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
} else {
for (int32_t j = 0; j < numOfExpr; ++j) {
......@@ -1190,17 +1191,6 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr,
savePrevOrderColumns(pInfo, pBlock, i);
}
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
aAggs[functionId].xFinalize(&pCtx[j]);
}
pInfo->binfo.pRes->info.rows += 1;
}
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
......@@ -1311,15 +1301,14 @@ bool needToMergeRv(SSDataBlock* pBlock, SLocalMerger *pLocalMerge, int32_t index
int32_t ret = 0;
tOrderDescriptor *pDesc = pLocalMerge->pDesc;
if (pDesc->orderInfo.numOfCols > 0) {
if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
// todo refactor comparator
// if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
ret = compare_aRv(pBlock, pDesc->orderInfo.colIndex, pDesc->orderInfo.numOfCols, index, buf, TSDB_ORDER_ASC);
} else { // desc
// } else { // desc
// ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
}
// }
}
/* if ret == 0, means the result belongs to the same group */
// if ret == 0, means the result belongs to the same group
return (ret == 0);
}
......@@ -1912,12 +1901,75 @@ SSDataBlock* doGlobalAggregate(void* param) {
// not belongs to the same group, return the result of current group;
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows);
doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pBlock, false);
}
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
int32_t functionId = pAggInfo->binfo.pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
}
pAggInfo->binfo.pRes->info.rows += 1;
pOperator->status = OP_EXEC_DONE;
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
return pAggInfo->binfo.pRes;
}
SSDataBlock* doSLimit(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSLimitOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SSDataBlock* pBlock = NULL;
while (1) {
pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
if (pRuntimeEnv->currentOffset == 0) {
break;
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes);
}
pRuntimeEnv->currentOffset = 0;
break;
}
}
if (pInfo->total + pBlock->info.rows >= pInfo->limit) {
pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total);
pInfo->total = pInfo->limit;
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
} else {
pInfo->total += pBlock->info.rows;
}
return pBlock;
}
......@@ -3434,6 +3434,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
pQueryAttr->limit = pQueryInfo->limit;
pQueryAttr->slimit = pQueryInfo->slimit;
pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
......
......@@ -181,6 +181,7 @@ typedef struct SSDataBlock {
// execution of query in a data node.
typedef struct SQueryAttr {
SLimitVal limit;
SLimitVal slimit;
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
......@@ -412,10 +413,13 @@ typedef struct SArithOperatorInfo {
uint32_t seed;
} SArithOperatorInfo;
typedef struct SLimitOperatorInfo {
int64_t limit;
int64_t total;
} SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo {
int64_t limit;
int64_t total;
char **prevRow;
bool hasPrev;
SArray *orderColumnList;
} SSLimitOperatorInfo;
typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo;
......@@ -442,6 +446,7 @@ struct SLocalMerger;
typedef struct SMultiwayMergeInfo {
struct SLocalMerger *pMerge;
SOptrBasicInfo binfo;
int32_t bufCapacity;
int64_t seed;
char **prevRow;
bool hasPrev;
......@@ -465,12 +470,16 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, int32_t* orderColumn, int32_t numOfOrder);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SSDataBlock* doGlobalAggregate(void* param);
SSDataBlock* doSLimit(void* param);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows);
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
......
......@@ -1728,7 +1728,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
case OP_MultiwaySort: {
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3,
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
4096, merger); // TODO hack it
break;
}
......@@ -1736,7 +1736,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_GlobalAggregate: {
pRuntimeEnv->proot =
createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, &pQueryAttr->order.orderColId, 1);
pQueryAttr->numOfExpr3);
break;
}
case OP_SLimit: {
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
break;
}
......@@ -2925,12 +2930,11 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
}
void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows) {
SSDataBlock* pDataBlock = pBInfo->pRes;
int32_t newSize = pDataBlock->info.rows + numOfInputRows;
if (pInfo->bufCapacity < newSize) {
if ((*bufCapacity) < newSize) {
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes);
......@@ -2939,7 +2943,7 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) {
// it starts from the tail of the previously generated results.
pBInfo->pCtx[i].pOutput = pColInfo->pData;
pInfo->bufCapacity = newSize;
(*bufCapacity) = newSize;
} else {
// longjmp
}
......@@ -4300,56 +4304,75 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
return pOptr;
}
SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
int32_t numOfCols = pQuery->pGroupbyExpr->numOfGroupCols;
SArray* pOrderColumns = NULL;
if (numOfCols > 0) {
pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo);
}
if (pQuery->interval.interval > 0) {
if (pOrderColumns == NULL) {
pOrderColumns = taosArrayInit(1, sizeof(SColIndex));
}
SColIndex colIndex = {.colIndex = 0, .colId = 0, .flag = TSDB_COL_NORMAL};
taosArrayPush(pOrderColumns, &colIndex);
}
return pOrderColumns;
}
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput, int32_t* orderColumn,
int32_t numOfOrder) {
SExprInfo* pExpr, int32_t numOfOutput) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
// SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows = 4096;
// int32_t numOfRows =
// (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->bufCapacity = 4096;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
// TODO refactor
int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.resBytes;
}
pInfo->prevRow = taosArrayInit(numOfOrder, (POINTER_BYTES * numOfOrder + len));
int32_t numOfCols = pInfo->orderColumnList != NULL? taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfOutput;
for(int32_t i = 0; i < numOfOrder; ++i) {
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
int32_t index = orderColumn[i];
if (index != INT32_MIN) {
offset += pExpr[index].base.resBytes;
}
SColIndex* index = taosArrayGet(pInfo->orderColumnList, i);
offset += pExpr[index->colIndex].base.resBytes;
}
pInfo->orderColumnList = taosArrayFromList(orderColumn, numOfOrder, sizeof(int32_t));
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GlobalAggregate";
pOperator->name = "GlobalAggregate";
pOperator->operatorType = OP_GlobalAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doGlobalAggregate;
pOperator->cleanup = destroyBasicOperatorInfo;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doGlobalAggregate;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
......@@ -4492,7 +4515,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(pArithInfo, pBlock->info.rows);
updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
......@@ -5120,9 +5143,28 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator;
}
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit;
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQueryAttr->slimit.limit;
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
// TODO refactor
int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.resBytes;
}
int32_t numOfCols = pInfo->orderColumnList != NULL? taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfOutput;
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
SColIndex* index = taosArrayGet(pInfo->orderColumnList, i);
offset += pExpr[index->colIndex].base.resBytes;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -5131,7 +5173,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->exec = doLimit;
pOperator->exec = doSLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册