未验证 提交 5571ebee 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #17542 from taosdata/feature/stream

enh(stream): internal optimize
...@@ -1730,13 +1730,14 @@ typedef struct { ...@@ -1730,13 +1730,14 @@ typedef struct {
char name[TSDB_STREAM_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
char sourceDB[TSDB_DB_FNAME_LEN]; char sourceDB[TSDB_DB_FNAME_LEN];
char targetStbFullName[TSDB_TABLE_FNAME_LEN]; char targetStbFullName[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
char* sql; char* sql;
char* ast; char* ast;
int8_t igExists;
int8_t triggerType; int8_t triggerType;
int8_t igExpired;
int8_t fillHistory; // process data inserted before creating stream
int64_t maxDelay; int64_t maxDelay;
int64_t watermark; int64_t watermark;
int8_t igExpired;
int32_t numOfTags; int32_t numOfTags;
SArray* pTags; // array of SField SArray* pTags; // array of SField
} SCMCreateStreamReq; } SCMCreateStreamReq;
......
...@@ -36,6 +36,7 @@ typedef struct SStreamTask SStreamTask; ...@@ -36,6 +36,7 @@ typedef struct SStreamTask SStreamTask;
enum { enum {
STREAM_STATUS__NORMAL = 0, STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP, STREAM_STATUS__STOP,
STREAM_STATUS__INIT,
STREAM_STATUS__FAILED, STREAM_STATUS__FAILED,
STREAM_STATUS__RECOVER, STREAM_STATUS__RECOVER,
}; };
...@@ -291,6 +292,9 @@ typedef struct SStreamTask { ...@@ -291,6 +292,9 @@ typedef struct SStreamTask {
int64_t recoverSnapVer; int64_t recoverSnapVer;
int64_t startVer; int64_t startVer;
// fill history
int8_t fillHistory;
// children info // children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*> SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
int32_t nextCheckId; int32_t nextCheckId;
...@@ -534,7 +538,7 @@ typedef struct SStreamMeta { ...@@ -534,7 +538,7 @@ typedef struct SStreamMeta {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); // int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "taosdef.h" #include "taosdef.h"
#include "tarray.h" #include "tarray.h"
#include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "tscalablebf.h" #include "tscalablebf.h"
...@@ -24,6 +25,11 @@ ...@@ -24,6 +25,11 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SUpdateKey {
int64_t tbUid;
TSKEY ts;
} SUpdateKey;
typedef struct SUpdateInfo { typedef struct SUpdateInfo {
SArray *pTsBuckets; SArray *pTsBuckets;
uint64_t numBuckets; uint64_t numBuckets;
...@@ -41,6 +47,7 @@ typedef struct SUpdateInfo { ...@@ -41,6 +47,7 @@ typedef struct SUpdateInfo {
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
......
...@@ -50,6 +50,9 @@ uint64_t MurmurHash3_64(const char *key, uint32_t len); ...@@ -50,6 +50,9 @@ uint64_t MurmurHash3_64(const char *key, uint32_t len);
uint32_t taosIntHash_32(const char *key, uint32_t len); uint32_t taosIntHash_32(const char *key, uint32_t len);
uint32_t taosIntHash_64(const char *key, uint32_t len); uint32_t taosIntHash_64(const char *key, uint32_t len);
uint32_t taosFastHash(const char *key, uint32_t len);
uint32_t taosDJB2Hash(const char *key, uint32_t len);
_hash_fn_t taosGetDefaultHashFunction(int32_t type); _hash_fn_t taosGetDefaultHashFunction(int32_t type);
_equal_fn_t taosGetDefaultEqualFunction(int32_t type); _equal_fn_t taosGetDefaultEqualFunction(int32_t type);
......
...@@ -613,6 +613,7 @@ typedef struct { ...@@ -613,6 +613,7 @@ typedef struct {
// config // config
int8_t igExpired; int8_t igExpired;
int8_t trigger; int8_t trigger;
int8_t fillHistory;
int64_t triggerParam; int64_t triggerParam;
int64_t watermark; int64_t watermark;
// source and target // source and target
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "mndConsumer.h" #include "mndConsumer.h"
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
...@@ -31,6 +32,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { ...@@ -31,6 +32,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1; if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
...@@ -74,10 +76,12 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { ...@@ -74,10 +76,12 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
...@@ -91,6 +95,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { ...@@ -91,6 +95,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
...@@ -134,6 +139,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { ...@@ -134,6 +139,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
tEndDecode(pDecoder);
return 0; return 0;
} }
......
...@@ -239,6 +239,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -239,6 +239,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(tasks, pTask); mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pVgroup->vgId; pTask->nodeId = pVgroup->vgId;
...@@ -270,6 +271,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -270,6 +271,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(tasks, pTask); mndAddTaskToTaskSet(tasks, pTask);
ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId); ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId);
...@@ -356,6 +358,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -356,6 +358,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
pInnerTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
...@@ -422,6 +425,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -422,6 +425,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskSourceLevel, pTask); mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->triggerParam = 0; pTask->triggerParam = 0;
......
...@@ -143,8 +143,10 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { ...@@ -143,8 +143,10 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, buf, tlen + 1); tDecoderInit(&decoder, buf, tlen + 1);
if (tDecodeSStreamObj(&decoder, pStream) < 0) { if (tDecodeSStreamObj(&decoder, pStream) < 0) {
tDecoderClear(&decoder);
goto STREAM_DECODE_OVER; goto STREAM_DECODE_OVER;
} }
tDecoderClear(&decoder);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -280,6 +282,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, ...@@ -280,6 +282,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->trigger = pCreate->triggerType; pObj->trigger = pCreate->triggerType;
pObj->triggerParam = pCreate->maxDelay; pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark; pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
...@@ -686,7 +689,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -686,7 +689,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER; goto _OVER;
} }
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
// create stb for stream // create stb for stream
......
...@@ -162,6 +162,8 @@ typedef struct { ...@@ -162,6 +162,8 @@ typedef struct {
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t recoverStartVer; int64_t recoverStartVer;
int64_t recoverEndVer; int64_t recoverEndVer;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
SStreamState* pState; SStreamState* pState;
} SStreamTaskInfo; } SStreamTaskInfo;
......
...@@ -569,8 +569,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -569,8 +569,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pResult->info.groupId = pSrcBlock->info.groupId; if (pResult != pSrcBlock) {
memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pResult->info.groupId = pSrcBlock->info.groupId;
memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
}
// if the source equals to the destination, it is to create a new column as the result of scalar // if the source equals to the destination, it is to create a new column as the result of scalar
// function or some operators. // function or some operators.
......
...@@ -284,7 +284,6 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, ...@@ -284,7 +284,6 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return true; return true;
} }
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
if (pTableScanInfo->pseudoSup.numOfExprs > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
SExprSupp* pSup = &pTableScanInfo->pseudoSup; SExprSupp* pSup = &pTableScanInfo->pseudoSup;
...@@ -743,7 +742,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -743,7 +742,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); pInfo->pColMatchInfo =
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1713,9 +1713,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1713,9 +1713,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
#endif #endif
#if 1
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
pTSInfo->cond.startVersion = -1;
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
pTSInfo->scanTimes = 0; pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1; pTSInfo->currentGroupId = -1;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
...@@ -1724,12 +1727,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1724,12 +1727,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp); SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
if (pBlock != NULL) { if (pBlock != NULL) {
calBlockTbName(&pInfo->tbnameCalSup, pBlock);
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
return pBlock; return pBlock;
} }
// TODO fill in bloom filter
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
return NULL; return NULL;
} }
#endif
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor // TODO: refactor
...@@ -2115,7 +2120,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT ...@@ -2115,7 +2120,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL); pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
return pOperator; return pOperator;
_end: _end:
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
......
...@@ -3149,6 +3149,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -3149,6 +3149,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
TSKEY maxTs = INT64_MIN; TSKEY maxTs = INT64_MIN;
...@@ -3191,6 +3192,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3191,6 +3192,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else { } else {
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
&pInfo->interval, &pInfo->delKey); &pInfo->interval, &pInfo->delKey);
streamStateCommit(pTaskInfo->streamInfo.pState);
} }
return NULL; return NULL;
} else { } else {
...@@ -5387,6 +5389,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5387,6 +5389,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
&pInfo->delKey); &pInfo->delKey);
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
streamStateCommit(pTaskInfo->streamInfo.pState);
return NULL; return NULL;
} }
......
...@@ -94,7 +94,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* ...@@ -94,7 +94,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) { if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0); tDecoderClear(&decoder);
goto FAIL; goto FAIL;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -113,6 +113,13 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* ...@@ -113,6 +113,13 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
ASSERT(0); ASSERT(0);
goto FAIL; goto FAIL;
} }
if (pTask->fillHistory) {
// pipeline exec
// if finished, dispatch a stream-prepare-finished msg to downstream task
// set status normal
}
return 0; return 0;
FAIL: FAIL:
...@@ -120,6 +127,7 @@ FAIL: ...@@ -120,6 +127,7 @@ FAIL:
return -1; return -1;
} }
#if 0
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL; void* buf = NULL;
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
...@@ -149,6 +157,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -149,6 +157,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0; return 0;
} }
#endif
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
......
...@@ -49,7 +49,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { ...@@ -49,7 +49,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
} }
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1; if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
...@@ -64,6 +64,10 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -64,6 +64,10 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->recoverSnapVer) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->startVer) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;
int32_t epSz = taosArrayGetSize(pTask->childEpInfo); int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
if (tEncodeI32(pEncoder, epSz) < 0) return -1; if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) { for (int32_t i = 0; i < epSz; i++) {
...@@ -93,12 +97,12 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -93,12 +97,12 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
} }
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
/*tEndEncode(pEncoder);*/ tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/ if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
...@@ -113,6 +117,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -113,6 +117,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->recoverSnapVer) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->startVer) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;
int32_t epSz; int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1; if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*)); pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
...@@ -150,7 +158,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -150,7 +158,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
} }
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
/*tEndDecode(pDecoder);*/ tEndDecode(pDecoder);
return 0; return 0;
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "query.h" #include "query.h"
#include "tdatablock.h"
#include "tencode.h" #include "tencode.h"
#include "tstreamUpdate.h" #include "tstreamUpdate.h"
#include "ttime.h" #include "ttime.h"
...@@ -162,15 +163,42 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { ...@@ -162,15 +163,42 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
return false; return false;
} }
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) {
if (pBlock == NULL || pBlock->info.rows == 0) return;
TSKEY maxTs = -1;
int64_t tbUid = pBlock->info.uid;
SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i];
maxTs = TMAX(maxTs, ts);
SScalableBf *pSBf = getSBf(pInfo, ts);
if (pSBf) {
tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
}
}
TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
if (pMaxTs == NULL || *pMaxTs > tbUid) {
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
}
}
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
int32_t res = TSDB_CODE_FAILED; int32_t res = TSDB_CODE_FAILED;
SUpdateKey updateKey = {
.tbUid = tableId,
.ts = ts,
};
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) { if (ts < maxTs - pInfo->watermark) {
// this window has been closed. // this window has been closed.
if (pInfo->pCloseWinSBF) { if (pInfo->pCloseWinSBF) {
res = tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY)); res = tScalableBfPut(pInfo->pCloseWinSBF, &updateKey, sizeof(SUpdateKey));
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
return false; return false;
} else { } else {
...@@ -183,7 +211,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { ...@@ -183,7 +211,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
SScalableBf *pSBf = getSBf(pInfo, ts); SScalableBf *pSBf = getSBf(pInfo, ts);
// pSBf may be a null pointer // pSBf may be a null pointer
if (pSBf) { if (pSBf) {
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
} }
int32_t size = taosHashGetSize(pInfo->pMap); int32_t size = taosHashGetSize(pInfo->pMap);
......
...@@ -57,8 +57,10 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) { ...@@ -57,8 +57,10 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
// ln(2) = 0.693147180559945 // ln(2) = 0.693147180559945
pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945); pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945);
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
pBF->hashFn1 = taosFastHash;
pBF->hashFn2 = taosDJB2Hash;
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
if (pBF->buffer == NULL) { if (pBF->buffer == NULL) {
tBloomFilterDestroy(pBF); tBloomFilterDestroy(pBF);
...@@ -144,4 +146,4 @@ _error: ...@@ -144,4 +146,4 @@ _error:
return NULL; return NULL;
} }
bool tBloomFilterIsFull(const SBloomFilter *pBF) { return pBF->size >= pBF->expectedEntries; } bool tBloomFilterIsFull(const SBloomFilter *pBF) { return pBF->size >= pBF->expectedEntries; }
\ No newline at end of file
...@@ -32,6 +32,23 @@ ...@@ -32,6 +32,23 @@
(h) ^= (h) >> 16; \ (h) ^= (h) >> 16; \
} while (0) } while (0)
uint32_t taosFastHash(const char *key, uint32_t len) {
uint32_t result = 0x55555555;
for (uint32_t i = 0; i < len; i++) {
result ^= (uint8_t)key[i];
result = ROTL32(result, 5);
}
return result;
}
uint32_t taosDJB2Hash(const char *key, uint32_t len) {
uint32_t hash = 5381;
for (uint32_t i = 0; i < len; i++) {
hash = ((hash << 5) + hash) + (uint8_t)key[i]; /* hash * 33 + c */
}
return hash;
}
uint32_t MurmurHash3_32(const char *key, uint32_t len) { uint32_t MurmurHash3_32(const char *key, uint32_t len) {
const uint8_t *data = (const uint8_t *)key; const uint8_t *data = (const uint8_t *)key;
const int32_t nblocks = len >> 2u; const int32_t nblocks = len >> 2u;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册