提交 5a6ab297 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/dnode

...@@ -68,6 +68,7 @@ typedef struct { ...@@ -68,6 +68,7 @@ typedef struct {
} SysNameInfo; } SysNameInfo;
SysNameInfo taosGetSysNameInfo(); SysNameInfo taosGetSysNameInfo();
bool taosCheckCurrentInDll();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -592,7 +592,9 @@ void hbThreadFuncUnexpectedStopped(void) { ...@@ -592,7 +592,9 @@ void hbThreadFuncUnexpectedStopped(void) {
static void *hbThreadFunc(void *param) { static void *hbThreadFunc(void *param) {
setThreadName("hb"); setThreadName("hb");
#ifdef WINDOWS #ifdef WINDOWS
atexit(hbThreadFuncUnexpectedStopped); if (taosCheckCurrentInDll()) {
atexit(hbThreadFuncUnexpectedStopped);
}
#endif #endif
while (1) { while (1) {
int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2);
......
...@@ -1584,6 +1584,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1584,6 +1584,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t rowSize = pDataBlock->info.rowSize; int32_t rowSize = pDataBlock->info.rowSize;
int64_t groupId = pDataBlock->info.groupId; int64_t groupId = pDataBlock->info.groupId;
if (colNum <= 1) {
// invalid if only with TS col
continue;
}
if (rb.nCols != colNum) { if (rb.nCols != colNum) {
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
} }
...@@ -1680,23 +1685,28 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1680,23 +1685,28 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
msgLen += pSubmitBlk->dataLen; msgLen += pSubmitBlk->dataLen;
} }
(*pReq)->length = msgLen; if (numOfBlks > 0) {
(*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId);
(*pReq)->header.contLen = htonl(msgLen); (*pReq)->header.vgId = htonl(vgId);
(*pReq)->length = (*pReq)->header.contLen; (*pReq)->header.contLen = htonl(msgLen);
(*pReq)->numOfBlocks = htonl(numOfBlks); (*pReq)->length = (*pReq)->header.contLen;
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); (*pReq)->numOfBlocks = htonl(numOfBlks);
while (numOfBlks--) { SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
int32_t dataLen = blk->dataLen; while (numOfBlks--) {
blk->uid = htobe64(blk->uid); int32_t dataLen = blk->dataLen;
blk->suid = htobe64(blk->suid); blk->uid = htobe64(blk->uid);
blk->padding = htonl(blk->padding); blk->suid = htobe64(blk->suid);
blk->sversion = htonl(blk->sversion); blk->padding = htonl(blk->padding);
blk->dataLen = htonl(blk->dataLen); blk->sversion = htonl(blk->sversion);
blk->schemaLen = htonl(blk->schemaLen); blk->dataLen = htonl(blk->dataLen);
blk->numOfRows = htons(blk->numOfRows); blk->schemaLen = htonl(blk->schemaLen);
blk = (SSubmitBlk*)(blk->data + dataLen); blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
} else {
// no valid rows
taosMemoryFreeClear(*pReq);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -260,6 +260,7 @@ struct SSma { ...@@ -260,6 +260,7 @@ struct SSma {
#define SMA_CFG(s) (&(s)->pVnode->config) #define SMA_CFG(s) (&(s)->pVnode->config)
#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) #define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg)
#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions)
#define SMA_LOCKED(s) ((s)->locked) #define SMA_LOCKED(s) ((s)->locked)
#define SMA_META(s) ((s)->pVnode->pMeta) #define SMA_META(s) ((s)->pVnode->pMeta)
#define SMA_VID(s) TD_VID((s)->pVnode) #define SMA_VID(s) TD_VID((s)->pVnode)
......
...@@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 ...@@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
} }
if (taosArrayGetSize(pResult) > 0) { if (taosArrayGetSize(pResult) > 0) {
#if 1 #if 0
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, level); snprintf(flag, 10, "level %" PRIi8, level);
blockDebugShowData(pResult, flag); blockDebugShowData(pResult, flag);
#endif #endif
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) { if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
} else { } else {
smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
...@@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { ...@@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SRetention *pRetention = SMA_RETENTION(pSma);
if (!RETENTION_VALID(pRetention + 1)) {
// return directly if retention level 1 is invalid
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
STbUidStore uidStore = {0}; STbUidStore uidStore = {0};
tdFetchSubmitReqSuids(pMsg, &uidStore); tdFetchSubmitReqSuids(pMsg, &uidStore);
......
...@@ -1938,7 +1938,9 @@ void ctgCleanupCacheQueue(void) { ...@@ -1938,7 +1938,9 @@ void ctgCleanupCacheQueue(void) {
void* ctgUpdateThreadFunc(void* param) { void* ctgUpdateThreadFunc(void* param) {
setThreadName("catalog"); setThreadName("catalog");
#ifdef WINDOWS #ifdef WINDOWS
atexit(ctgUpdateThreadUnexpectedStopped); if (taosCheckCurrentInDll()) {
atexit(ctgUpdateThreadUnexpectedStopped);
}
#endif #endif
qInfo("catalog update thread started"); qInfo("catalog update thread started");
......
...@@ -696,10 +696,6 @@ typedef struct SSortedMergeOperatorInfo { ...@@ -696,10 +696,6 @@ typedef struct SSortedMergeOperatorInfo {
int32_t numOfResPerPage; int32_t numOfResPerPage;
char** groupVal; char** groupVal;
SArray *groupInfo; SArray *groupInfo;
bool hasGroupId;
uint64_t groupId;
STupleHandle* prefetchedTuple;
} SSortedMergeOperatorInfo; } SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo { typedef struct SSortOperatorInfo {
...@@ -712,10 +708,6 @@ typedef struct SSortOperatorInfo { ...@@ -712,10 +708,6 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
STupleHandle *prefetchedTuple;
bool hasGroupId;
uint64_t groupId;
} SSortOperatorInfo; } SSortOperatorInfo;
typedef struct STagFilterOperatorInfo { typedef struct STagFilterOperatorInfo {
......
...@@ -130,12 +130,6 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId); ...@@ -130,12 +130,6 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
*/ */
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId); void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
/**
*
* @param pVHandle
* @return
*/
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
/** /**
* *
* @param pSortHandle * @param pSortHandle
......
...@@ -3031,31 +3031,12 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo ...@@ -3031,31 +3031,12 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
...@@ -3074,7 +3055,6 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo ...@@ -3074,7 +3055,6 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
pDataBlock->info.rows = p->info.rows; pDataBlock->info.rows = p->info.rows;
pDataBlock->info.capacity = p->info.rows; pDataBlock->info.capacity = p->info.rows;
pDataBlock->info.groupId = pInfo->groupId;
} }
blockDataDestroy(p); blockDataDestroy(p);
...@@ -3340,7 +3320,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -3340,7 +3320,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL; size_t rows = blockDataGetNumOfRows(pInfo->pRes);
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pInfo->pRes; return (rows == 0) ? NULL : pInfo->pRes;
...@@ -4920,7 +4900,10 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -4920,7 +4900,10 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
} }
SArray* extractPartitionColInfo(SNodeList* pNodeList) { SArray* extractPartitionColInfo(SNodeList* pNodeList) {
if (!pNodeList) return NULL; if(!pNodeList) {
return NULL;
}
size_t numOfCols = LIST_LENGTH(pNodeList); size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) { if (pList == NULL) {
......
...@@ -2175,25 +2175,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa ...@@ -2175,25 +2175,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
pTupleHandle = tsortNextTuple(pHandle); pTupleHandle = tsortNextTuple(pHandle);
} else { } else {
pTupleHandle = pInfo->prefetchedTuple; pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
} }
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
......
...@@ -40,15 +40,13 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR ...@@ -40,15 +40,13 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->hasGroupId = false; pOperator->name = "SortOperator";
pInfo->prefetchedTuple = NULL;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now. // lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize +
...@@ -97,31 +95,12 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i ...@@ -97,31 +95,12 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
...@@ -140,7 +119,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i ...@@ -140,7 +119,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
pDataBlock->info.rows = p->info.rows; pDataBlock->info.rows = p->info.rows;
pDataBlock->info.capacity = p->info.rows; pDataBlock->info.capacity = p->info.rows;
pDataBlock->info.groupId = pInfo->groupId;
} }
blockDataDestroy(p); blockDataDestroy(p);
...@@ -255,10 +233,7 @@ typedef struct SMultiwaySortMergeOperatorInfo { ...@@ -255,10 +233,7 @@ typedef struct SMultiwaySortMergeOperatorInfo {
SSDataBlock* pInputBlock; SSDataBlock* pInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
uint64_t groupId;
bool hasGroupId;
uint64_t groupId;
STupleHandle* prefetchedTuple;
} SMultiwaySortMergeOperatorInfo; } SMultiwaySortMergeOperatorInfo;
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
...@@ -312,31 +287,12 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -312,31 +287,12 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
...@@ -432,14 +388,12 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, ...@@ -432,14 +388,12 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
uint32_t numOfSources = taosArrayGetSize(pSortInfo); uint32_t numOfSources = taosArrayGetSize(pSortInfo);
numOfSources = TMAX(2, numOfSources); numOfSources = TMAX(4, numOfSources);
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize; pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
......
...@@ -1257,6 +1257,10 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { ...@@ -1257,6 +1257,10 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) {
return pWin->ekey < pSup->maxTs - pSup->waterMark;
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SArray* closeWins) { SArray* closeWins) {
void* pIte = NULL; void* pIte = NULL;
...@@ -1269,7 +1273,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, ...@@ -1269,7 +1273,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL);
if (win.ekey < pSup->maxTs - pSup->waterMark) { if (isCloseWindow(&win, pSup)) {
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen); taosHashRemove(pHashMap, keyBuf, keyLen);
...@@ -2036,7 +2040,55 @@ _error: ...@@ -2036,7 +2040,55 @@ _error:
return NULL; return NULL;
} }
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId, bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; }
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
SExecTaskInfo* pTaskInfo) {
for (int32_t k = 0; k < numOfOutput; ++k) {
if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
continue;
}
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
longjmp(pTaskInfo->env, code);
}
}
}
}
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArray* pWinArray, int32_t groupId,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
int32_t size = taosArrayGetSize(pWinArray);
ASSERT(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
STimeWindow* pParentWin = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL;
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j);
SIntervalAggOperatorInfo* pChInfo = pChildOp->info;
SResultRow* pChResult = NULL;
setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, 0, pChInfo->binfo.pCtx,
pChildOp->numOfExprs, pChInfo->binfo.rowCellInfoOffset, &pChInfo->aggSup, pTaskInfo);
compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo);
}
}
}
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable,
pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL;
}
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SArray* pUpdated) { SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
...@@ -2060,6 +2112,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2060,6 +2112,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
while (1) { while (1) {
if (isFinalInterval(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) &&
isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) {
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
taosArrayPush(pUpWins, &nextWin);
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
pOperatorInfo->numOfExprs, pOperatorInfo->pTaskInfo);
taosArrayDestroy(pUpWins);
}
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx, int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
...@@ -2089,47 +2149,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2089,47 +2149,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} }
} }
bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; }
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
SExecTaskInfo* pTaskInfo) {
for (int32_t k = 0; k < numOfOutput; ++k) {
if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
continue;
}
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
longjmp(pTaskInfo->env, code);
}
}
}
}
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArray* pWinArray, int32_t groupId,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
int32_t size = taosArrayGetSize(pWinArray);
ASSERT(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
STimeWindow* pParentWin = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL;
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
for (int32_t j = 0; j < numOfChildren; j++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j);
SIntervalAggOperatorInfo* pChInfo = pChildOp->info;
SResultRow* pChResult = NULL;
setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, 0, pChInfo->binfo.pCtx,
pChildOp->numOfExprs, pChInfo->binfo.rowCellInfoOffset, &pChInfo->aggSup, pTaskInfo);
compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo);
}
}
}
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
taosHashClear(pInfo->aggSup.pResultRowHashTable); taosHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf); clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
...@@ -2169,6 +2188,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2169,6 +2188,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
TSKEY maxTs = INT64_MIN;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -2222,6 +2242,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2222,6 +2242,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
if (isFinalInterval(pInfo)) { if (isFinalInterval(pInfo)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
...@@ -2238,10 +2259,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2238,10 +2259,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
} }
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (isFinalInterval(pInfo)) { if (isFinalInterval(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
} }
...@@ -2564,7 +2585,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t ...@@ -2564,7 +2585,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t
} }
static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
int32_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset, uint64_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset,
SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
assert(pWinInfo->win.skey <= pWinInfo->win.ekey); assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
// too many time window in query // too many time window in query
...@@ -2642,7 +2663,7 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) ...@@ -2642,7 +2663,7 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
return size - startIndex - 1; return size - startIndex - 1;
} }
void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId, void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, uint64_t groupId,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) {
SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex); SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex);
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
...@@ -2667,13 +2688,18 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, ...@@ -2667,13 +2688,18 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
} }
} }
typedef struct SWinRes {
TSKEY ts;
uint64_t groupId;
} SWinRes;
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated,
SHashObj* pStDeleted) { SHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
bool masterScan = true; bool masterScan = true;
int32_t numOfOutput = pOperator->numOfExprs; int32_t numOfOutput = pOperator->numOfExprs;
int64_t groupId = pSDataBlock->info.groupId; uint64_t groupId = pSDataBlock->info.groupId;
int64_t gap = pInfo->gap; int64_t gap = pInfo->gap;
int64_t code = TSDB_CODE_SUCCESS; int64_t code = TSDB_CODE_SUCCESS;
...@@ -2693,7 +2719,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -2693,7 +2719,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < pSDataBlock->info.rows;) { for (int32_t i = 0; i < pSDataBlock->info.rows;) {
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, gap, &winIndex); SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], groupId, gap, &winIndex);
winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
...@@ -2709,7 +2735,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -2709,7 +2735,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
pCurWin->isClosed = false; pCurWin->isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY)); SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId};
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -2736,7 +2763,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* ...@@ -2736,7 +2763,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo*
} }
} }
static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t groupId) { static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) {
void* pData = NULL; void* pData = NULL;
size_t keyLen = 0; size_t keyLen = 0;
while ((pData = taosHashIterate(pStUpdated, pData)) != NULL) { while ((pData = taosHashIterate(pStUpdated, pData)) != NULL) {
...@@ -2746,9 +2773,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t ...@@ -2746,9 +2773,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t
if (pos == NULL) { if (pos == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
pos->groupId = groupId; pos->groupId = ((SWinRes*)pData)->groupId;
pos->pos = *(SResultRowPosition*)key; pos->pos = *(SResultRowPosition*)key;
*(int64_t*)pos->key = *(uint64_t*)pData; *(int64_t*)pos->key = ((SWinRes*)pData)->ts;
taosArrayPush(pUpdated, &pos); taosArrayPush(pUpdated, &pos);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2815,7 +2842,9 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra ...@@ -2815,7 +2842,9 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
__get_win_info_ fn) { __get_win_info_ fn) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
void **pIte = NULL; void **pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
uint64_t* pGroupId = taosHashGetKey(pIte, &keyLen);
SArray *pWins = (SArray *) (*pIte); SArray *pWins = (SArray *) (*pIte);
int32_t size = taosArrayGetSize(pWins); int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
...@@ -2825,7 +2854,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra ...@@ -2825,7 +2854,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
if (!pSeWin->isClosed) { if (!pSeWin->isClosed) {
pSeWin->isClosed = true; pSeWin->isClosed = true;
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed);
pSeWin->isOutput = true; pSeWin->isOutput = true;
} }
} }
...@@ -2892,7 +2921,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2892,7 +2921,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, pChildOp->numOfExprs, doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, pChildOp->numOfExprs,
pChildInfo->gap, NULL); pChildInfo->gap, NULL);
rebuildTimeWindow(pInfo, pWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo); rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo);
} }
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
continue; continue;
...@@ -2916,7 +2945,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2916,7 +2945,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getSessionWinInfo); getSessionWinInfo);
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
...@@ -3216,8 +3245,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -3216,8 +3245,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} }
pCurWin->winInfo.isClosed = false; pCurWin->winInfo.isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &(pCurWin->winInfo.win.skey), SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId};
sizeof(TSKEY)); code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition),
&value, sizeof(SWinRes));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -3274,7 +3304,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3274,7 +3304,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getStateWinInfo); getStateWinInfo);
copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId); copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
......
...@@ -557,59 +557,40 @@ static int32_t createInitialSources(SSortHandle* pHandle) { ...@@ -557,59 +557,40 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
taosArrayClear(pHandle->pOrderedSource); taosArrayClear(pHandle->pOrderedSource);
bool hasGroupId = false;
SSDataBlock* prefetchedDataBlock = NULL;
while (1) { while (1) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (prefetchedDataBlock == NULL) {
pBlock = pHandle->fetchfp(source->param);
} else {
pBlock = prefetchedDataBlock;
prefetchedDataBlock = NULL;
}
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
if (!hasGroupId) { if (pHandle->pDataBlock == NULL) {
// calculate the buffer pages according to the total available buffers.
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock)); pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));
// todo, number of pages are set according to the total available sort buffer // todo, number of pages are set according to the total available sort buffer
pHandle->numOfPages = 1024; pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize; sortBufSize = pHandle->numOfPages * pHandle->pageSize;
hasGroupId = true;
pHandle->pDataBlock = createOneDataBlock(pBlock, false); pHandle->pDataBlock = createOneDataBlock(pBlock, false);
} }
if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) { if (pHandle->beforeFp != NULL) {
// perform the scalar function calculation before apply the sort pHandle->beforeFp(pBlock, pHandle->param);
if (pHandle->beforeFp != NULL) { }
pHandle->beforeFp(pBlock, pHandle->param);
}
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != 0) { if (code != 0) {
return code; return code;
} }
size_t size = blockDataGetSize(pHandle->pDataBlock); size_t size = blockDataGetSize(pHandle->pDataBlock);
if (size > sortBufSize) { if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs(); int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
doAddToBuf(pHandle->pDataBlock, pHandle); doAddToBuf(pHandle->pDataBlock, pHandle);
}
} else {
prefetchedDataBlock = pBlock;
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
} }
} }
...@@ -758,10 +739,6 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { ...@@ -758,10 +739,6 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
} }
} }
uint64_t tsortGetGroupId(STupleHandle* pVHandle) {
return pVHandle->pBlock->info.groupId;
}
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
SSortExecInfo info = {0}; SSortExecInfo info = {0};
......
...@@ -945,3 +945,19 @@ SysNameInfo taosGetSysNameInfo() { ...@@ -945,3 +945,19 @@ SysNameInfo taosGetSysNameInfo() {
return info; return info;
#endif #endif
} }
bool taosCheckCurrentInDll() {
#ifdef WINDOWS
MEMORY_BASIC_INFORMATION mbi;
char path[PATH_MAX] = {0};
GetModuleFileName(((VirtualQuery(taosCheckCurrentInDll,&mbi,sizeof(mbi)) != 0) ? (HMODULE)mbi.AllocationBase : NULL), path, PATH_MAX);
int strLastIndex = strlen(path);
if ((path[strLastIndex-3] == 'd' || path[strLastIndex-3] == 'D') && (path[strLastIndex-2] == 'l' || path[strLastIndex-2] == 'L') && (path[strLastIndex-1] == 'l' || path[strLastIndex-1] == 'L')) {
return true;
}
return false;
#else
return false;
#endif
}
\ No newline at end of file
...@@ -829,7 +829,11 @@ void *taosCacheTimedRefresh(void *handle) { ...@@ -829,7 +829,11 @@ void *taosCacheTimedRefresh(void *handle) {
const int32_t SLEEP_DURATION = 500; // 500 ms const int32_t SLEEP_DURATION = 500; // 500 ms
int64_t count = 0; int64_t count = 0;
atexit(taosCacheRefreshWorkerUnexpectedStopped); #ifdef WINDOWS
if (taosCheckCurrentInDll()) {
atexit(taosCacheRefreshWorkerUnexpectedStopped);
}
#endif
while (1) { while (1) {
taosMsleep(SLEEP_DURATION); taosMsleep(SLEEP_DURATION);
......
...@@ -173,4 +173,39 @@ endi ...@@ -173,4 +173,39 @@ endi
sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s); sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file sql create database test1 vgroups 1;
sql use test1;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791222001,2,2,3);
$loop_count = 0
loop2:
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
#rows 1
if $data11 != 2 then
print =====data11=$data11
goto loop2
endi
system sh/stop_dnodes.sh
\ No newline at end of file
...@@ -34,6 +34,7 @@ print =====rows=$rows ...@@ -34,6 +34,7 @@ print =====rows=$rows
goto loop0 goto loop0
endi endi
print =====loop0
sql create database test1 vgroups 1; sql create database test1 vgroups 1;
sql use test1; sql use test1;
...@@ -51,7 +52,7 @@ sql insert into ts2 values(1648791211000,1,2,3); ...@@ -51,7 +52,7 @@ sql insert into ts2 values(1648791211000,1,2,3);
$loop_count = 0 $loop_count = 0
loop0: loop1:
sleep 300 sleep 300
sql select * from streamt; sql select * from streamt;
...@@ -62,7 +63,62 @@ endi ...@@ -62,7 +63,62 @@ endi
if $rows != 2 then if $rows != 2 then
print =====rows=$rows print =====rows=$rows
goto loop0 goto loop1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT print =====loop1
\ No newline at end of file
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts2 values(1648791222002,2,2,3,5);
sql insert into ts2 values(1648791222002,2,2,3,6);
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
$loop_count = 0
loop2:
sleep 300
sql select * from streamtST;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 1 then
print =====data02=$data02
goto loop2
endi
if $data03 != 1 then
print =====data03=$data03
goto loop2
endi
if $data04 != 2 then
print =====data04=$data04
goto loop2
endi
print =====loop2
system sh/stop_dnodes.sh
\ No newline at end of file
...@@ -5,7 +5,12 @@ set /a a=0 ...@@ -5,7 +5,12 @@ set /a a=0
if %1 == full ( if %1 == full (
echo Windows Taosd Full Test echo Windows Taosd Full Test
set /a exitNum=0 set /a exitNum=0
for /F "usebackq tokens=*" %%i in (fulltest.bat) do ( del /Q /F failed.txt
set caseFile="fulltest.bat"
if not "%2" == "" (
set caseFile="%2"
)
for /F "usebackq tokens=*" %%i in (!caseFile!) do (
for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" ( for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
set /a a+=1 set /a a+=1
echo !a! Processing %%i echo !a! Processing %%i
...@@ -13,7 +18,7 @@ if %1 == full ( ...@@ -13,7 +18,7 @@ if %1 == full (
set time1=!_timeTemp! set time1=!_timeTemp!
echo Start at !time! echo Start at !time!
call %%i ARG1 > result_!a!.txt 2>error_!a!.txt call %%i ARG1 > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 ) else ( call :colorEcho 0a "Success" &echo. ) if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 && echo %%i >>failed.txt ) else ( call :colorEcho 0a "Success" &echo. )
) )
) )
exit !exitNum! exit !exitNum!
...@@ -77,4 +82,4 @@ for %%a in (%tt%) do ( ...@@ -77,4 +82,4 @@ for %%a in (%tt%) do (
set /a index=index+1 set /a index=index+1
) )
set /a _timeTemp=(%hh%*60+%mm%)*60+%ss% set /a _timeTemp=(%hh%*60+%mm%)*60+%ss%
goto :eof goto :eof
\ No newline at end of file
Subproject commit 0a81480420d6601bbdb57770ee64e40f24c4ea83 Subproject commit 3d5aa76f8c718dcffa100b45e4cbf313d499c356
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册