提交 8ba02fd7 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/valgrind

...@@ -134,7 +134,7 @@ typedef struct SSyncFSM { ...@@ -134,7 +134,7 @@ typedef struct SSyncFSM {
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply); int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
} SSyncFSM; } SSyncFSM;
......
...@@ -1479,10 +1479,12 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s ...@@ -1479,10 +1479,12 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s
} }
beigin++; beigin++;
} }
if (dataOffset > 0) { if (dataOffset > 0) {
memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen); memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
} }
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
return dataLen; return dataLen;
} }
......
...@@ -143,7 +143,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit ...@@ -143,7 +143,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter); return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
} }
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d", isApply); mInfo("stop to apply snapshot to sdb, apply:%d", isApply);
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply); return sdbStopWrite(pMnode->pSdb, pWriter, isApply);
......
...@@ -191,7 +191,7 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader); ...@@ -191,7 +191,7 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader);
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData); int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData);
// SVSnapWriter // SVSnapWriter
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter); int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter);
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback); int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
// structs // structs
......
...@@ -173,7 +173,7 @@ _err: ...@@ -173,7 +173,7 @@ _err:
return code; return code;
} }
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) { int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
......
...@@ -515,10 +515,10 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void ...@@ -515,10 +515,10 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
#endif #endif
} }
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapWriterClose(pWriter, !isApply); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
return code; return code;
#else #else
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
......
...@@ -247,6 +247,16 @@ typedef struct SLoadRemoteDataInfo { ...@@ -247,6 +247,16 @@ typedef struct SLoadRemoteDataInfo {
uint64_t totalElapsed; // total elapsed time uint64_t totalElapsed; // total elapsed time
} SLoadRemoteDataInfo; } SLoadRemoteDataInfo;
typedef struct SLimitInfo {
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
} SLimitInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
...@@ -257,6 +267,7 @@ typedef struct SExchangeInfo { ...@@ -257,6 +267,7 @@ typedef struct SExchangeInfo {
int32_t current; int32_t current;
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
uint64_t self; uint64_t self;
SLimitInfo limitInfo;
} SExchangeInfo; } SExchangeInfo;
typedef struct SColMatchInfo { typedef struct SColMatchInfo {
...@@ -542,15 +553,7 @@ typedef struct SProjectOperatorInfo { ...@@ -542,15 +553,7 @@ typedef struct SProjectOperatorInfo {
SNode* pFilterNode; // filter info, which is push down by optimizer SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* existDataBlock; SSDataBlock* existDataBlock;
SArray* pPseudoColInfo; SArray* pPseudoColInfo;
SLimit limit; SLimitInfo limitInfo;
SLimit slimit;
uint64_t groupId;
int64_t curSOffset;
int64_t curGroupOutput;
int64_t curOffset;
int64_t curOutput;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SIndefOperatorInfo { typedef struct SIndefOperatorInfo {
...@@ -791,6 +794,9 @@ int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf ...@@ -791,6 +794,9 @@ int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
const char* pkey); const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
...@@ -837,7 +843,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -837,7 +843,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
......
...@@ -899,3 +899,22 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI ...@@ -899,3 +899,22 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
return w; return w;
} }
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
pLimitInfo->slimit.offset != -1);
}
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
pLimitInfo->limit = limit;
pLimitInfo->slimit= slimit;
pLimitInfo->remainOffset = limit.offset;
pLimitInfo->remainGroupOffset = slimit.offset;
}
\ No newline at end of file
...@@ -43,6 +43,11 @@ ...@@ -43,6 +43,11 @@
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
#if 0 #if 0
static UNUSED_FUNC void *u_malloc (size_t __size) { static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = taosRand(); uint32_t v = taosRand();
...@@ -2343,7 +2348,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2343,7 +2348,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -2369,6 +2374,44 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2369,6 +2374,44 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
} }
} }
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
while(1) {
SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
if (pBlock == NULL) {
return NULL;
}
ASSERT(pBlock == pExchangeInfo->pResult);
SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
if (hasLimitOffsetInfo(pLimitInfo)) {
int32_t status = handleLimitOffset(pOperator, pLimitInfo, pExchangeInfo->pResult, false);
if (status == PROJECT_RETRIEVE_CONTINUE) {
continue;
} else if (status == PROJECT_RETRIEVE_DONE) {
size_t rows = pExchangeInfo->pResult->info.rows;
pExchangeInfo->limitInfo.numOfOutputRows += rows;
if (rows == 0) {
doSetOperatorCompleted(pOperator);
return NULL;
} else {
return pExchangeInfo->pResult;
}
}
} else {
return pExchangeInfo->pResult;
}
}
}
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) { static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL) { if (pInfo->pSourceDataInfo == NULL) {
...@@ -2408,6 +2451,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* ...@@ -2408,6 +2451,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
taosArrayPush(pInfo->pSources, pNode); taosArrayPush(pInfo->pSources, pNode);
} }
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
pInfo->self = taosAddRef(exchangeObjRefPool, pInfo); pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
return initDataSource(numOfSources, pInfo, id); return initDataSource(numOfSources, pInfo, id);
...@@ -3151,68 +3195,60 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { ...@@ -3151,68 +3195,60 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
enum { int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
PROJECT_RETRIEVE_CONTINUE = 0x1, if (pLimitInfo->remainGroupOffset > 0) {
PROJECT_RETRIEVE_DONE = 0x2, if (pLimitInfo->currentGroupId == 0) { // it is the first group
}; pLimitInfo->currentGroupId = pBlock->info.groupId;
blockDataCleanup(pBlock);
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes;
if (pProjectInfo->curSOffset > 0) {
if (pProjectInfo->groupId == 0) { // it is the first group
pProjectInfo->groupId = pBlock->info.groupId;
blockDataCleanup(pInfo->pRes);
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
} else if (pProjectInfo->groupId != pBlock->info.groupId) { } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
pProjectInfo->curSOffset -= 1; // now it is the data from a new group
pLimitInfo->remainGroupOffset -= 1;
// ignore data block in current group // ignore data block in current group
if (pProjectInfo->curSOffset > 0) { if (pLimitInfo->remainGroupOffset > 0) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
} }
} }
// set current group id of the project operator // set current group id of the project operator
pProjectInfo->groupId = pBlock->info.groupId; pLimitInfo->currentGroupId = pBlock->info.groupId;
} }
if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) { if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
pProjectInfo->curGroupOutput += 1; pLimitInfo->numOfOutputGroups += 1;
if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) { if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
blockDataCleanup(pRes); blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} }
// reset the value for a new group data // reset the value for a new group data
pProjectInfo->curOffset = 0; pLimitInfo->numOfOutputRows = 0;
pProjectInfo->curOutput = 0; pLimitInfo->remainOffset = pLimitInfo->limit.offset;
} }
// here we reach the start position, according to the limit/offset requirements. // here we reach the start position, according to the limit/offset requirements.
// set current group id // set current group id
pProjectInfo->groupId = pBlock->info.groupId; pLimitInfo->currentGroupId = pBlock->info.groupId;
if (pProjectInfo->curOffset >= pRes->info.rows) { if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pProjectInfo->curOffset -= pRes->info.rows; pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataCleanup(pRes); blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
} else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) { } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset); blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pProjectInfo->curOffset = 0; pLimitInfo->remainOffset = 0;
} }
// check for the limitation in each group // check for the limitation in each group
if (pProjectInfo->limit.limit >= 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) { if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pRes, keepRows); blockDataKeepFirstNRows(pBlock, keepRows);
if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) { if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -3222,8 +3258,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) ...@@ -3222,8 +3258,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
// todo optimize performance // todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case. // they may not belong to the same group the limit/offset value is not valid in this case.
if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 || if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
pProjectInfo->slimit.limit != -1) { pLimitInfo->slimit.limit != -1) {
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer. } else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
...@@ -3309,7 +3345,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -3309,7 +3345,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
int32_t status = handleLimitOffset(pOperator, pBlock); int32_t status = handleLimitOffset(pOperator, &pProjectInfo->limitInfo, pInfo->pRes, true);
// filter shall be applied after apply functions and limit/offset on the result // filter shall be applied after apply functions and limit/offset on the result
doFilter(pProjectInfo->pFilterNode, pInfo->pRes); doFilter(pProjectInfo->pFilterNode, pInfo->pRes);
...@@ -3321,9 +3357,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -3321,9 +3357,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
} }
pProjectInfo->curOutput += pInfo->pRes->info.rows;
size_t rows = pInfo->pRes->info.rows; size_t rows = pInfo->pRes->info.rows;
pProjectInfo->limitInfo.numOfOutputRows += rows;
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
...@@ -3767,10 +3803,6 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) ...@@ -3767,10 +3803,6 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols)
return pList; return pList;
} }
static int64_t getLimit(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
...@@ -3783,13 +3815,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -3783,13 +3815,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
SLimit limit = {.limit = getLimit(pProjPhyNode->node.pLimit), .offset = getOffset(pProjPhyNode->node.pLimit)}; initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
SLimit slimit = {.limit = getLimit(pProjPhyNode->node.pSlimit), .offset = getOffset(pProjPhyNode->node.pSlimit)};
pInfo->limit = limit;
pInfo->slimit = slimit;
pInfo->curOffset = limit.offset;
pInfo->curSOffset = slimit.offset;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pFilterNode = pProjPhyNode->node.pConditions; pInfo->pFilterNode = pProjPhyNode->node.pConditions;
......
...@@ -440,19 +440,19 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -440,19 +440,19 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
SDataGroupInfo* pGInfo = NULL; SDataGroupInfo* pGroupInfo = NULL;
void *pPage = getCurrentDataGroupInfo(pInfo, &pGInfo, len); void *pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
pGInfo->numOfRows += 1; pGroupInfo->numOfRows += 1;
if (pGInfo->groupId == 0) {
pGInfo->groupId = calcGroupId(pInfo->keyBuf, len); // group id
if (pGroupInfo->groupId == 0) {
pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
} }
// number of rows // number of rows
int32_t* rows = (int32_t*) pPage; int32_t* rows = (int32_t*) pPage;
// group id
size_t numOfCols = pOperator->exprSupp.numOfExprs; size_t numOfCols = pOperator->exprSupp.numOfExprs;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
...@@ -603,7 +603,13 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { ...@@ -603,7 +603,13 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
static int compareDataGroupInfo(const void* group1, const void* group2) { static int compareDataGroupInfo(const void* group1, const void* group2) {
const SDataGroupInfo* pGroupInfo1 = group1; const SDataGroupInfo* pGroupInfo1 = group1;
const SDataGroupInfo* pGroupInfo2 = group2; const SDataGroupInfo* pGroupInfo2 = group2;
return pGroupInfo1->groupId - pGroupInfo2->groupId;
if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
ASSERT(0);
return 0;
}
return (pGroupInfo1->groupId < pGroupInfo2->groupId)? -1:1;
} }
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
......
...@@ -22,31 +22,31 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain ...@@ -22,31 +22,31 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, // todo add limit/offset impl
SExecTaskInfo* pTaskInfo) { SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
goto _error; goto _error;
} }
SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SSDataBlock* pResBlock = createResDataBlock(pDescNode); SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo = SArray* pColMatchColInfo =
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
pInfo->pCondition = pSortPhyNode->node.pConditions; pInfo->pCondition = pSortNode->node.pConditions;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator"; pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
......
...@@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { ...@@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256; int32_t len = 256;
char * s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64]; char host[64];
...@@ -434,8 +434,8 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { ...@@ -434,8 +434,8 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver != NULL) { if (pReceiver != NULL) {
// close writer // close writer
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t ret = int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); false, &(pReceiver->snapshot));
ASSERT(ret == 0); ASSERT(ret == 0);
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
} }
...@@ -483,8 +483,8 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh ...@@ -483,8 +483,8 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data // force close, abandon incomplete data
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t ret = int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); &(pReceiver->snapshot));
ASSERT(ret == 0); ASSERT(ret == 0);
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
} }
...@@ -524,8 +524,8 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend ...@@ -524,8 +524,8 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend
// FpSnapshotStopWrite should not be called, assert writer == NULL // FpSnapshotStopWrite should not be called, assert writer == NULL
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t ret = int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); &(pReceiver->snapshot));
ASSERT(ret == 0); ASSERT(ret == 0);
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
} }
...@@ -574,7 +574,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap ...@@ -574,7 +574,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
} }
// stop writer, apply data // stop writer, apply data
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true); code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&(pReceiver->snapshot));
if (code != 0) { if (code != 0) {
syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error"); syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
ASSERT(0); ASSERT(0);
...@@ -646,7 +647,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -646,7 +647,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON * pTmp = pFromId; cJSON *pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
...@@ -679,14 +680,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -679,14 +680,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256; int32_t len = 256;
char * s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId; SRaftId fromId = pReceiver->fromId;
char host[128]; char host[128];
......
...@@ -125,7 +125,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) ...@@ -125,7 +125,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter)
return 0; return 0;
} }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter, snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter,
isApply); isApply);
......
...@@ -30,7 +30,7 @@ int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } ...@@ -30,7 +30,7 @@ int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; } int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { return 0; }
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }
SSyncSnapshotReceiver* createReceiver() { SSyncSnapshotReceiver* createReceiver() {
......
...@@ -126,7 +126,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) ...@@ -126,7 +126,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter)
return 0; return 0;
} }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) {
if (isApply) { if (isApply) {
gSnapshotLastApplyIndex = gFinishLastApplyIndex; gSnapshotLastApplyIndex = gFinishLastApplyIndex;
gSnapshotLastApplyTerm = gFinishLastApplyTerm; gSnapshotLastApplyTerm = gFinishLastApplyTerm;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册