提交 cbe712b4 编写于 作者: H Haojun Liao

[td-225]

上级 93c7e07d
...@@ -1119,22 +1119,29 @@ static void doExecuteFinalMerge( SLocalMerger *pLocalMerge, int32_t numOfExpr, b ...@@ -1119,22 +1119,29 @@ static void doExecuteFinalMerge( SLocalMerger *pLocalMerge, int32_t numOfExpr, b
} }
} }
static void savePrevOrderColumns(SMultiwayMergeInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) { //TODO it is not ordered, fix it
int32_t size = pInfo->pMerge->pDesc->orderInfo.numOfCols; static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex, bool* hasPrev) {
int32_t size = (int32_t) taosArrayGetSize(pColumnList);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
int32_t index = pInfo->pMerge->pDesc->orderInfo.colIndex[i]; SColIndex* index = taosArrayGet(pColumnList, i);
// int32_t index = *(int16_t*)taosArrayGet(pInfo->orderColumnList, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index->colIndex);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index); assert(index->colId == pColInfo->info.colId);
memcpy(pInfo->prevRow[i], pColInfo->pData + pColInfo->info.bytes * rowIndex, pColInfo->info.bytes); memcpy(prevRow[i], pColInfo->pData + pColInfo->info.bytes * rowIndex, pColInfo->info.bytes);
} }
pInfo->hasPrev = true; (*hasPrev) = true;
} }
static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, SSDataBlock* pBlock, bool needInit) { static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, SSDataBlock* pBlock, bool needInit) {
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
add[i] = pCtx[i].pInput;
}
for(int32_t i = 0; i < pBlock->info.rows; ++i) { for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) { if (pInfo->hasPrev) {
if (needToMergeRv(pBlock, pInfo->pMerge, i, pInfo->prevRow)) { if (needToMergeRv(pBlock, pInfo->pMerge, i, pInfo->prevRow)) {
...@@ -1160,6 +1167,23 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, ...@@ -1160,6 +1167,23 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr,
pInfo->binfo.pRes->info.rows += 1; pInfo->binfo.pRes->info.rows += 1;
if (i == 0) {
for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += pCtx[j].outputBytes;
aAggs[pCtx[j].functionId].init(&pCtx[j]);
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
pCtx[j].size = 1;
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
for(int32_t j = 0; j < numOfExpr; ++j) { for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += pCtx[j].outputBytes; pCtx[j].pOutput += pCtx[j].outputBytes;
pCtx[j].pInput += pCtx[j].inputBytes; pCtx[j].pInput += pCtx[j].inputBytes;
...@@ -1188,9 +1212,16 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, ...@@ -1188,9 +1212,16 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr,
aAggs[functionId].mergeFunc(&pCtx[j]); aAggs[functionId].mergeFunc(&pCtx[j]);
} }
} }
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
}
savePrevOrderColumns(pInfo, pBlock, i); {
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].pInput = add[i];
}
} }
tfree(add);
} }
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
...@@ -1804,16 +1835,14 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel ...@@ -1804,16 +1835,14 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
char* p = pColInfo->pData + pBlock->info.rows * pColInfo->info.bytes; char* p = pColInfo->pData + pBlock->info.rows * pColInfo->info.bytes;
// char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->num, col);
char *src = COLMODEL_GET_VAL(buf, pModel, maxRows, rowIndex, i); char *src = COLMODEL_GET_VAL(buf, pModel, maxRows, rowIndex, i);
// char* src = buf + rowIndex * pColInfo->info.bytes;
memmove(p, src, pColInfo->info.bytes); memmove(p, src, pColInfo->info.bytes);
} }
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
static SSDataBlock* doMultiwaySort(void* param) { SSDataBlock* doMultiwaySort(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -1841,6 +1870,50 @@ static SSDataBlock* doMultiwaySort(void* param) { ...@@ -1841,6 +1870,50 @@ static SSDataBlock* doMultiwaySort(void* param) {
// chosen from loser tree // chosen from loser tree
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
bool sameGroup = true;
if (pInfo->hasPrev) {
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
// if this row belongs to current result set group
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex);
char *newRow =
COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pModel, pOneDataSrc->pMemBuffer->pColumnModel->capacity,
pOneDataSrc->rowIdx, pIndex->colIndex);
char * data = pInfo->prevRow[i];
int32_t ret = columnValueAscendingComparator(data, newRow, pColInfo->info.type, pColInfo->info.bytes);
if (ret == 0) {
continue;
} else {
sameGroup = false;
break;
}
}
}
if (!sameGroup || !pInfo->hasPrev) { //save the data
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex);
char *curCol =
COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pModel, pOneDataSrc->pMemBuffer->pColumnModel->capacity,
pOneDataSrc->rowIdx, pIndex->colIndex);
memcpy(pInfo->prevRow[i], curCol, pColInfo->info.bytes);
}
pInfo->hasPrev = true;
}
if (!sameGroup && pInfo->binfo.pRes->info.rows > 0) {
return pInfo->binfo.pRes;
}
appendOneRowToDataBlock(pInfo->binfo.pRes, pOneDataSrc->filePage.data, pModel, pOneDataSrc->rowIdx, pOneDataSrc->pMemBuffer->pColumnModel->capacity); appendOneRowToDataBlock(pInfo->binfo.pRes, pOneDataSrc->filePage.data, pModel, pOneDataSrc->rowIdx, pOneDataSrc->pMemBuffer->pColumnModel->capacity);
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
...@@ -1854,7 +1927,7 @@ static SSDataBlock* doMultiwaySort(void* param) { ...@@ -1854,7 +1927,7 @@ static SSDataBlock* doMultiwaySort(void* param) {
pOneDataSrc->rowIdx += 1; pOneDataSrc->rowIdx += 1;
adjustLoserTreeFromNewData(pMerger, pOneDataSrc, pTree); adjustLoserTreeFromNewData(pMerger, pOneDataSrc, pTree);
if (pInfo->binfo.pRes->info.rows >= 4096) { // TODO threshold if (pInfo->binfo.pRes->info.rows >= pInfo->bufCapacity) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
} }
...@@ -1862,26 +1935,6 @@ static SSDataBlock* doMultiwaySort(void* param) { ...@@ -1862,26 +1935,6 @@ static SSDataBlock* doMultiwaySort(void* param) {
return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL; return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL;
} }
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
int32_t numOfRows, void *merger) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->pMerge = merger;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiwaySortOperator";
pOperator->operatorType = OP_MultiwaySort;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->exec = doMultiwaySort;
return pOperator;
}
SSDataBlock* doGlobalAggregate(void* param) { SSDataBlock* doGlobalAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -1890,15 +1943,55 @@ SSDataBlock* doGlobalAggregate(void* param) { ...@@ -1890,15 +1943,55 @@ SSDataBlock* doGlobalAggregate(void* param) {
SMultiwayMergeInfo* pAggInfo = pOperator->info; SMultiwayMergeInfo* pAggInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; // SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SOperatorInfo *upstream = pOperator->upstream; SOperatorInfo *upstream = pOperator->upstream;
{
if (pAggInfo->hasDataBlockForNewGroup) {
// not belongs to the same group, return the result of current group;
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows);
doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pAggInfo->pExistBlock, false);
savePrevOrderColumns(pAggInfo->groupPrevRow, pAggInfo->groupColumnList, pAggInfo->pExistBlock, 0,
&pAggInfo->hasPrev);
pAggInfo->pExistBlock = NULL;
pAggInfo->hasDataBlockForNewGroup = false;
}
}
while(1) { while(1) {
SSDataBlock* pBlock = upstream->exec(upstream); SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
if (pAggInfo->hasPrev) {
bool sameGroup = true;
int32_t numOfCols = (int32_t) taosArrayGetSize(pAggInfo->groupColumnList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(pAggInfo->groupColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pAggInfo->binfo.pRes->pDataBlock, pIndex->colIndex);
char *data = pAggInfo->groupPrevRow[i];
int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes);
if (ret == 0) {
continue;
} else {
sameGroup = false;
break;
}
}
if (!sameGroup) {
pAggInfo->hasDataBlockForNewGroup = true;
pAggInfo->pExistBlock = pBlock;
savePrevOrderColumns(pAggInfo->prevRow, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasPrev);
break;
}
}
// not belongs to the same group, return the result of current group; // not belongs to the same group, return the result of current group;
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows);
...@@ -1911,13 +2004,14 @@ SSDataBlock* doGlobalAggregate(void* param) { ...@@ -1911,13 +2004,14 @@ SSDataBlock* doGlobalAggregate(void* param) {
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
} }
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
} }
pAggInfo->binfo.pRes->info.rows += 1; pAggInfo->binfo.pRes->info.rows += 1;
pOperator->status = OP_EXEC_DONE; // pOperator->status = OP_EXEC_DONE;
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); // setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
return pAggInfo->binfo.pRes; return pAggInfo->binfo.pRes;
} }
...@@ -1929,7 +2023,6 @@ SSDataBlock* doSLimit(void* param) { ...@@ -1929,7 +2023,6 @@ SSDataBlock* doSLimit(void* param) {
} }
SSLimitOperatorInfo *pInfo = pOperator->info; SSLimitOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (1) { while (1) {
...@@ -1940,34 +2033,96 @@ SSDataBlock* doSLimit(void* param) { ...@@ -1940,34 +2033,96 @@ SSDataBlock* doSLimit(void* param) {
return NULL; return NULL;
} }
if (pRuntimeEnv->currentOffset == 0) { if (pInfo->currentGroupOffset == 0) {
break; if (pInfo->currentOffset == 0) { // TODO refactor
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { break;
pRuntimeEnv->currentOffset -= pBlock->info.rows; } else if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else {
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
// move the remain rows of this data block to the front.
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
}
pInfo->currentOffset = 0;
break;
}
} else { } else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); if (pInfo->hasPrev) {
pBlock->info.rows = remain; // Check if current data block belongs to current result group or not
// if (needToMergeRv(pBlock, pInfo->pMerger, 0, pInfo->prevRow)) {
bool sameGroup = true;
int32_t numOfCols = (int32_t) taosArrayGetSize(pInfo->orderColumnList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
char *data = pInfo->prevRow[i];
int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes);
if (ret == 0) {
continue;
} else {
sameGroup = false;
break;
}
}
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { if (sameGroup) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); continue; // ignore the data block of the same group and try next
} else {
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
pInfo->currentOffset = pInfo->limit.offset; // set the offset value for a new group
pInfo->rowsTotal = 0;
if ((--pInfo->currentGroupOffset) == 0) {
if (pInfo->currentOffset == 0) { // TODO refactor
break;
} else if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else {
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
// move the remain rows of this data block to the front.
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
}
int16_t bytes = pColInfoData->info.bytes; pInfo->currentOffset = 0;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); break;
}
}
}
} else {
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
} }
}
}
pRuntimeEnv->currentOffset = 0; if (!pInfo->hasPrev || !needToMergeRv(pBlock, pInfo->pMerger, 0, pInfo->prevRow)) {
break; pInfo->groupTotal += 1;
if (pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
return NULL;
} }
} }
if (pInfo->total + pBlock->info.rows >= pInfo->limit) { if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) {
pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total); pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal);
pInfo->total = pInfo->limit; pInfo->rowsTotal = pInfo->limit.limit;
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} else { } else {
pInfo->total += pBlock->info.rows; pInfo->rowsTotal += pBlock->info.rows;
} }
return pBlock; return pBlock;
......
...@@ -1594,8 +1594,8 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { ...@@ -1594,8 +1594,8 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
} }
uint64_t localQueryId = 0; uint64_t localQueryId = 0;
SMultiwayMergeInfo* pInfo = (SMultiwayMergeInfo*) pQueryInfo->pQInfo->runtimeEnv.proot->info; // SMultiwayMergeInfo* pInfo = (SMultiwayMergeInfo*) pQueryInfo->pQInfo->runtimeEnv.proot->info;
pInfo->pMerge = pRes->pLocalMerger; // pInfo->pMerge = pRes->pLocalMerger;
qTableQuery(pQueryInfo->pQInfo, &localQueryId); qTableQuery(pQueryInfo->pQInfo, &localQueryId);
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
......
...@@ -413,9 +413,22 @@ typedef struct SArithOperatorInfo { ...@@ -413,9 +413,22 @@ typedef struct SArithOperatorInfo {
uint32_t seed; uint32_t seed;
} SArithOperatorInfo; } SArithOperatorInfo;
typedef struct SSLimitOperatorInfo { typedef struct SLimitOperatorInfo {
int64_t limit; int64_t limit;
int64_t total; int64_t total;
} SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo {
int64_t groupTotal;
int64_t currentGroupOffset;
int64_t rowsTotal;
int64_t currentOffset;
SLimitVal limit;
SLimitVal slimit;
struct SLocalMerger *pMerger;
char **prevRow; char **prevRow;
bool hasPrev; bool hasPrev;
SArray *orderColumnList; SArray *orderColumnList;
...@@ -449,8 +462,15 @@ typedef struct SMultiwayMergeInfo { ...@@ -449,8 +462,15 @@ typedef struct SMultiwayMergeInfo {
int32_t bufCapacity; int32_t bufCapacity;
int64_t seed; int64_t seed;
char **prevRow; char **prevRow;
bool hasPrev;
SArray *orderColumnList; SArray *orderColumnList;
char **groupPrevRow;
SArray *groupColumnList;
bool hasDataBlockForNewGroup;
SSDataBlock *pExistBlock;
bool hasPrev;
bool groupMix;
} SMultiwayMergeInfo; } SMultiwayMergeInfo;
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
...@@ -469,12 +489,14 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti ...@@ -469,12 +489,14 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger); int32_t numOfRows, void* merger, bool groupMix);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
SSDataBlock* doGlobalAggregate(void* param); SSDataBlock* doGlobalAggregate(void* param);
SSDataBlock* doMultiwaySort(void* param);
SSDataBlock* doSLimit(void* param); SSDataBlock* doSLimit(void* param);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
......
...@@ -240,6 +240,8 @@ int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1 ...@@ -240,6 +240,8 @@ int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1
struct SSDataBlock; struct SSDataBlock;
int32_t compare_aRv(struct SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order); int32_t compare_aRv(struct SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order);
int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1728,20 +1728,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1728,20 +1728,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
case OP_MultiwaySort: { case OP_MultiwaySort: {
bool groupMix = true;
if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
groupMix = false;
}
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
4096, merger); // TODO hack it 4096, merger, groupMix); // TODO hack it
break; break;
} }
case OP_GlobalAggregate: { case OP_GlobalAggregate: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3); pQueryAttr->numOfExpr3, merger);
break; break;
} }
case OP_SLimit: { case OP_SLimit: {
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, merger);
break; break;
} }
...@@ -4321,22 +4326,56 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) { ...@@ -4321,22 +4326,56 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
taosArrayPush(pOrderColumns, &colIndex); taosArrayPush(pOrderColumns, &colIndex);
} }
{
numOfCols = (int32_t) taosArrayGetSize(pOrderColumns);
for(int32_t i = 0; i < numOfCols; ++i) {
SColIndex* index = taosArrayGet(pOrderColumns, i);
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
if (index->colId == pQuery->pExpr1[j].base.colInfo.colId) {
index->colIndex = j;
index->colId = pQuery->pExpr1[j].base.resColId;
}
}
}
}
return pOrderColumns; return pOrderColumns;
} }
SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) {
int32_t numOfCols = pQuery->pGroupbyExpr->numOfGroupCols;
SArray* pOrderColumns = NULL;
if (numOfCols > 0) {
pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo);
}
for(int32_t i = 0; i < numOfCols; ++i) {
SColIndex* index = taosArrayGet(pOrderColumns, i);
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
if (index->colId == pQuery->pExpr1[j].base.colInfo.colId) {
index->colIndex = j;
index->colId = pQuery->pExpr1[j].base.resColId;
}
}
}
return pOrderColumns;
}
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput) { SExprInfo* pExpr, int32_t numOfOutput, void* param) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
// int32_t numOfRows = // int32_t numOfRows =
// (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); // (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->pMerge = param;
pInfo->bufCapacity = 4096; pInfo->bufCapacity = 4096;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
// TODO refactor // TODO refactor
int32_t len = 0; int32_t len = 0;
...@@ -4344,7 +4383,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -4344,7 +4383,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
len += pExpr[i].base.resBytes; len += pExpr[i].base.resBytes;
} }
int32_t numOfCols = pInfo->orderColumnList != NULL? taosArrayGetSize(pInfo->orderColumnList):0; int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfOutput; int32_t offset = POINTER_BYTES * numOfOutput;
...@@ -4355,6 +4394,17 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -4355,6 +4394,17 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
offset += pExpr[index->colIndex].base.resBytes; offset += pExpr[index->colIndex].base.resBytes;
} }
numOfCols = (pInfo->groupColumnList != NULL)? taosArrayGetSize(pInfo->groupColumnList):0;
pInfo->groupPrevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
offset = POINTER_BYTES * numOfOutput;
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->groupPrevRow[i] = (char*)pInfo->groupPrevRow + offset;
SColIndex* index = taosArrayGet(pInfo->groupColumnList, i);
offset += pExpr[index->colIndex].base.resBytes;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand(); pInfo->seed = rand();
...@@ -4376,6 +4426,48 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -4376,6 +4426,48 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
return pOperator; return pOperator;
} }
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
int32_t numOfRows, void *merger, bool groupMix) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->pMerge = merger;
pInfo->groupMix = groupMix;
pInfo->bufCapacity = numOfRows;
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
{
int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.resBytes;
}
int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfOutput;
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
SColIndex* index = taosArrayGet(pInfo->orderColumnList, i);
offset += pExpr[index->colIndex].base.resBytes;
}
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiwaySortOperator";
pOperator->operatorType = OP_MultiwaySort;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->exec = doMultiwaySort;
return pOperator;
}
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order; return pTableScanInfo->order;
} }
...@@ -5143,11 +5235,18 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -5143,11 +5235,18 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator; return pOperator;
} }
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQueryAttr->slimit.limit;
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
pInfo->pMerger = pMerger;
pInfo->slimit = pQueryAttr->slimit;
pInfo->limit = pQueryAttr->limit;
pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
pInfo->currentOffset = pQueryAttr->limit.offset;
// TODO refactor // TODO refactor
int32_t len = 0; int32_t len = 0;
......
...@@ -364,7 +364,7 @@ static int32_t tsCompareFunc(TSKEY k1, TSKEY k2, int32_t order) { ...@@ -364,7 +364,7 @@ static int32_t tsCompareFunc(TSKEY k1, TSKEY k2, int32_t order) {
} }
} }
static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) { int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
int32_t first = *(int32_t *) f1; int32_t first = *(int32_t *) f1;
......
...@@ -152,7 +152,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { ...@@ -152,7 +152,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
} }
// limit/offset operator // limit/offset operator
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { if (pQueryAttr->slimit.limit > 0 || pQueryAttr->slimit.offset > 0) {
op = OP_SLimit; op = OP_SLimit;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册