提交 42f27e99 编写于 作者: 5 54liuyao

steam reprocess window

上级 753acd2d
...@@ -48,6 +48,7 @@ enum { ...@@ -48,6 +48,7 @@ enum {
typedef enum EStreamType { typedef enum EStreamType {
STREAM_NORMAL = 1, STREAM_NORMAL = 1,
STREAM_INVERT, STREAM_INVERT,
STREAM_REPROCESS,
STREAM_INVALID, STREAM_INVALID,
} EStreamType; } EStreamType;
......
...@@ -193,7 +193,6 @@ typedef struct SScanPhysiNode { ...@@ -193,7 +193,6 @@ typedef struct SScanPhysiNode {
} SScanPhysiNode; } SScanPhysiNode;
typedef SScanPhysiNode STagScanPhysiNode; typedef SScanPhysiNode STagScanPhysiNode;
typedef SScanPhysiNode SStreamScanPhysiNode;
typedef struct SSystemTableScanPhysiNode { typedef struct SSystemTableScanPhysiNode {
SScanPhysiNode scan; SScanPhysiNode scan;
...@@ -217,6 +216,7 @@ typedef struct STableScanPhysiNode { ...@@ -217,6 +216,7 @@ typedef struct STableScanPhysiNode {
} STableScanPhysiNode; } STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode;
typedef STableScanPhysiNode SStreamScanPhysiNode;
typedef struct SProjectPhysiNode { typedef struct SProjectPhysiNode {
SPhysiNode node; SPhysiNode node;
......
...@@ -1447,6 +1447,10 @@ void blockDebugShowData(const SArray* dataBlocks) { ...@@ -1447,6 +1447,10 @@ void blockDebugShowData(const SArray* dataBlocks) {
for (int32_t k = 0; k < colNum; k++) { for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (pColInfoData->hasNull) {
printf(" %15s |", "NULL");
continue;
}
switch (pColInfoData->info.type) { switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
...@@ -1464,6 +1468,9 @@ void blockDebugShowData(const SArray* dataBlocks) { ...@@ -1464,6 +1468,9 @@ void blockDebugShowData(const SArray* dataBlocks) {
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
printf(" %15lu |", *(uint64_t*)var); printf(" %15lu |", *(uint64_t*)var);
break; break;
case TSDB_DATA_TYPE_DOUBLE:
printf(" %15f |", *(double*)var);
break;
} }
} }
printf("\n"); printf("\n");
......
...@@ -945,6 +945,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { ...@@ -945,6 +945,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
.reader = pStreamReader, .reader = pStreamReader,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
}; };
pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
......
...@@ -364,9 +364,18 @@ typedef struct STagScanInfo { ...@@ -364,9 +364,18 @@ typedef struct STagScanInfo {
STableGroupInfo *pTableGroups; STableGroupInfo *pTableGroups;
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER,
} EStreamScanMode;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock. SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
...@@ -376,8 +385,12 @@ typedef struct SStreamBlockScanInfo { ...@@ -376,8 +385,12 @@ typedef struct SStreamBlockScanInfo {
SArray* pColMatchInfo; // SArray* pColMatchInfo; //
SNode* pCondition; SNode* pCondition;
SArray* tsArray; SArray* tsArray;
SUpdateInfo* pUpdateInfo; SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader;
EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
...@@ -678,8 +691,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -678,8 +691,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo); const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions); SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo,
SNode* pConditions, SOperatorInfo* pOperatorDumy, SInterval* pInterval);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
...@@ -733,6 +747,15 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi ...@@ -733,6 +747,15 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
int32_t length); int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length); int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
uint64_t groupId, int32_t numOfOutput);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -344,6 +344,28 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, ...@@ -344,6 +344,28 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
return pResultRow; return pResultRow;
} }
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
uint64_t groupId, int32_t numOfOutput) {
SAggSupporter* pSup = &pInfo->aggSup;
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
continue;
}
pResInfo->initialized = false;
if (pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
}
}
}
/** /**
* the struct of key in hash table * the struct of key in hash table
* +----------+---------------+ * +----------+---------------+
...@@ -4763,18 +4785,48 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4763,18 +4785,48 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo); return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, int32_t numOfCols = 0;
queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo); tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
} else {
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
}
if (pDataReader == NULL && terrno != 0) {
qDebug("pDataReader is NULL");
// return NULL;
} else {
qDebug("pDataReader is not NULL");
}
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SSDataBlock* pResBlockDumy = createResDataBlock(pDescNode);
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
SInterval interval = extractIntervalInfo(pTableScanNode);
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(
pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList,
pResBlockDumy, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
// int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
// queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode); SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t numOfCols = 0;
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo, SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions); pScanPhyNode->node.pConditions, pOperatorDumy, &interval);
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
......
...@@ -515,7 +515,40 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { ...@@ -515,7 +515,40 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
} }
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) { static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
if (pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
pInfo->interval.precision, NULL);
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
pTableScanInfo->cond.twindow = win;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
pTableScanInfo->scanTimes = 0;
return true;
} else {
return false;
}
}
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pOperatorDumy);
if (pResult == NULL) {
if (prepareDataScan(pInfo)) {
// scan next window data
pResult = doTableScan(pInfo->pOperatorDumy);
}
}
return pResult;
}
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) {
SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
TSKEY* ts = (TSKEY*)pColDataInfo->pData; TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) { for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
...@@ -523,13 +556,19 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) { ...@@ -523,13 +556,19 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
taosArrayPush(pInfo->tsArray, ts + i); taosArrayPush(pInfo->tsArray, ts + i);
} }
} }
if (taosArrayGetSize(pInfo->tsArray) > 0) { int32_t size = taosArrayGetSize(pInfo->tsArray);
if (size > 0 && invertible) {
// TODO(liuyao) get from tsdb // TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// p->info.type = STREAM_INVERT; // p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray); // taosArrayClear(pInfo->tsArray);
// return p; // return p;
return NULL; SSDataBlock* p = createOneDataBlock(pInfo->pRes, false);
taosArraySet(p->pDataBlock, 0, pInfo->tsArray);
p->info.rows = size;
p->info.type = STREAM_REPROCESS;
taosArrayClear(pInfo->tsArray);
return p;
} }
return NULL; return NULL;
} }
...@@ -556,14 +595,23 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -556,14 +595,23 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
return taosArrayGetP(pInfo->pBlockLists, current); return taosArrayGetP(pInfo->pBlockLists, current);
} else { } else {
if (total > 0) { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
ASSERT(total == 2); blockDataDestroy(pInfo->pUpdateRes);
SSDataBlock* pRes = taosArrayGetP(pInfo->pBlockLists, 0); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
SSDataBlock* pUpRes = taosArrayGetP(pInfo->pBlockLists, 1); return pInfo->pRes;
blockDataDestroy(pUpRes); } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
taosArrayClear(pInfo->pBlockLists); blockDataCleanup(pInfo->pRes);
return pRes; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo);
if (pSDB == NULL) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else {
return pSDB;
}
} }
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
...@@ -629,12 +677,18 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -629,12 +677,18 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) { if (rows == 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} else { } else if (pInfo->interval.interval > 0) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo); SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
if (upRes) { if (upRes) {
taosArrayPush(pInfo->pBlockLists, &(pInfo->pRes)); pInfo->pUpdateRes = upRes;
taosArrayPush(pInfo->pBlockLists, &upRes); if (upRes->info.type = STREAM_REPROCESS) {
return upRes; pInfo->updateResIndex = 0;
prepareDataScan(pInfo);
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (upRes->info.type = STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return upRes;
}
} }
} }
...@@ -642,8 +696,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -642,8 +696,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
} }
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition) { SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList,
SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy,
SInterval* pInterval) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -683,7 +739,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* ...@@ -683,7 +739,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
} }
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan pInfo->pUpdateInfo = updateInfoInitP(pInterval, 10000); // TODO(liuyao) get watermark from physical plan
if (pInfo->pUpdateInfo == NULL) { if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
...@@ -693,6 +749,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* ...@@ -693,6 +749,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pInfo->readerHandle = streamReadHandle; pInfo->readerHandle = streamReadHandle;
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition; pInfo->pCondition = pCondition;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pOperatorDumy;
pInfo->interval = *pInterval;
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
...@@ -82,7 +82,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T ...@@ -82,7 +82,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T
} }
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp
static STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win) { SInterval* pInterval, int32_t precision, STimeWindow* win) {
STimeWindow w = {0}; STimeWindow w = {0};
...@@ -186,7 +186,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se ...@@ -186,7 +186,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return forwardStep; return forwardStep;
} }
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
int32_t midPos = -1; int32_t midPos = -1;
int32_t numOfRows; int32_t numOfRows;
...@@ -249,7 +249,7 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { ...@@ -249,7 +249,7 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
return midPos; return midPos;
} }
static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order) { int32_t order) {
assert(startPos >= 0 && startPos < pDataBlockInfo->rows); assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
...@@ -988,6 +988,20 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type ...@@ -988,6 +988,20 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
} }
} }
} }
static void doClearWindows(SIntervalAggOperatorInfo* pInfo, int32_t numOfOutput, SSDataBlock* pBlock) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], &pInfo->interval,
pInfo->interval.precision, NULL);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
doClearWindow(pInfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
...@@ -1028,6 +1042,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1028,6 +1042,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->invertible) { if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
} }
if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(pInfo, pOperator->numOfExprs, pBlock);
continue;
}
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
} }
......
...@@ -1142,9 +1142,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { ...@@ -1142,9 +1142,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
return code; return code;
} }
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiScanNodeToJson(pObj, pJson); } static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiTableScanNodeToJson(pObj, pJson); }
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiScanNode(pJson, pObj); } static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiTableScanNode(pJson, pObj); }
static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet"; static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet";
static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite"; static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite";
......
...@@ -460,9 +460,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp ...@@ -460,9 +460,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq)); memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
pTableScan->scanRange = pScanLogicNode->scanRange; pTableScan->scanRange = pScanLogicNode->scanRange;
pTableScan->ratio = pScanLogicNode->ratio; pTableScan->ratio = pScanLogicNode->ratio;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); if (pScanLogicNode->pVgroupList) {
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable; pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
}
if (pCxt->pExecNodeList) {
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
}
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pTableScan->dataRequired = pScanLogicNode->dataRequired; pTableScan->dataRequired = pScanLogicNode->dataRequired;
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs); pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
...@@ -505,13 +509,12 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* ...@@ -505,13 +509,12 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) { SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan = int32_t res = createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
(SStreamScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision, if (res == TSDB_CODE_SUCCESS) {
(SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); ENodeType type = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
if (NULL == pScan) { setNodeType(*pPhyNode, type);
return TSDB_CODE_OUT_OF_MEMORY;
} }
return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); return res;
} }
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
...@@ -786,7 +789,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic ...@@ -786,7 +789,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
} }
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
SPhysiNode** pPhyNode) { SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode( SScanPhysiNode* pScan = (SScanPhysiNode*)makePhysiNode(
pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) { if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink // sink
if (pTask->sinkType == TASK_SINK__TABLE) { if (pTask->sinkType == TASK_SINK__TABLE) {
/*blockDebugShowData(pRes);*/ blockDebugShowData(pRes);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) { } else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册