未验证 提交 2de465b6 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19947 from taosdata/enh/stream_buffer_param1

Enh/stream buffer param1
...@@ -1772,6 +1772,7 @@ typedef struct { ...@@ -1772,6 +1772,7 @@ typedef struct {
SArray* pTags; // array of SField SArray* pTags; // array of SField
// 3.0.20 // 3.0.20
int64_t checkpointFreq; // ms int64_t checkpointFreq; // ms
int64_t deleteMark;
int8_t igUpdate; int8_t igUpdate;
} SCMCreateStreamReq; } SCMCreateStreamReq;
......
...@@ -110,6 +110,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal ...@@ -110,6 +110,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
#if 0 #if 0
char* streamStateSessionDump(SStreamState* pState); char* streamStateSessionDump(SStreamState* pState);
char* streamStateIntervalDump(SStreamState* pState);
#endif #endif
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1; if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -5487,6 +5488,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -5487,6 +5488,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
} }
} }
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, ...@@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->triggerParam = pCreate->maxDelay; pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark; pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory; pObj->fillHistory = pCreate->fillHistory;
pObj->deleteMark = pCreate->deleteMark;
pObj->igCheckUpdate = pCreate->igUpdate; pObj->igCheckUpdate = pCreate->igUpdate;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
...@@ -346,6 +347,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, ...@@ -346,6 +347,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger, .triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark, .watermark = pObj->watermark,
.igExpired = pObj->igExpired, .igExpired = pObj->igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate, .igCheckUpdate = pObj->igCheckUpdate,
}; };
......
...@@ -101,7 +101,8 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { ...@@ -101,7 +101,8 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
} }
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
if (tqOffsetRestoreFromFile(pStore, fname) < 0) { if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
ASSERT(0); taosMemoryFree(fname);
return NULL;
} }
taosMemoryFree(fname); taosMemoryFree(fname);
return pStore; return pStore;
......
...@@ -21,12 +21,34 @@ ...@@ -21,12 +21,34 @@
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeCommitImpl(SCommitInfo *pInfo); static int vnodeCommitImpl(SCommitInfo *pInfo);
#define WAIT_TIME_MILI_SEC 50
int vnodeBegin(SVnode *pVnode) { int vnodeBegin(SVnode *pVnode) {
// alloc buffer pool // alloc buffer pool
int32_t nTry = 0;
taosThreadMutexLock(&pVnode->mutex); taosThreadMutexLock(&pVnode->mutex);
while (pVnode->pPool == NULL) { while (pVnode->pPool == NULL) {
taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex); vInfo("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), ++nTry, WAIT_TIME_MILI_SEC);
struct timeval tv;
struct timespec ts;
taosGetTimeOfDay(&tv);
ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
if (ts.tv_nsec > 999999999l) {
ts.tv_sec = tv.tv_sec + 1;
ts.tv_nsec -= 1000000000l;
} else {
ts.tv_sec = tv.tv_sec;
}
int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
if (rc && rc != ETIMEDOUT) {
terrno = TAOS_SYSTEM_ERROR(rc);
taosThreadMutexUnlock(&pVnode->mutex);
return -1;
}
} }
pVnode->inUse = pVnode->pPool; pVnode->inUse = pVnode->pPool;
......
...@@ -312,10 +312,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -312,10 +312,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
walApplyVer(pVnode->pWal, version); walApplyVer(pVnode->pWal, version);
/*vInfo("vgId:%d, push msg begin", pVnode->config.vgId);*/
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
/*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
/*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
// commit if need // commit if need
if (needCommit) { if (needCommit) {
...@@ -1019,7 +1022,7 @@ _exit: ...@@ -1019,7 +1022,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert); atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert);
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess); atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess);
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); vDebug("vgId:%d %s done, index:%" PRId64, TD_VID(pVnode), __func__, version);
return 0; return 0;
} }
......
...@@ -119,8 +119,8 @@ static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsL ...@@ -119,8 +119,8 @@ static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsL
pRowSup->groupId = groupId; pRowSup->groupId = groupId;
} }
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
int32_t pos, int32_t order, int64_t* pData) { int32_t order, int64_t* pData) {
int32_t forwardRows = 0; int32_t forwardRows = 0;
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
...@@ -4815,6 +4815,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4815,6 +4815,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
taosHashCleanup(pUpdatedMap); taosHashCleanup(pUpdatedMap);
#if 0
char* pBuf = streamStateIntervalDump(pInfo->pState);
qDebug("===stream===interval state%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single interval delete"); printDataBlock(pInfo->pDelRes, "single interval delete");
......
...@@ -544,6 +544,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). ...@@ -544,6 +544,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C).
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; } stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; }
subtable_opt(A) ::= . { A = NULL; } subtable_opt(A) ::= . { A = NULL; }
......
...@@ -666,6 +666,9 @@ static uint8_t getPrecisionFromCurrStmt(SNode* pCurrStmt, uint8_t defaultVal) { ...@@ -666,6 +666,9 @@ static uint8_t getPrecisionFromCurrStmt(SNode* pCurrStmt, uint8_t defaultVal) {
if (isSetOperator(pCurrStmt)) { if (isSetOperator(pCurrStmt)) {
return ((SSetOperator*)pCurrStmt)->precision; return ((SSetOperator*)pCurrStmt)->precision;
} }
if (NULL != pCurrStmt && QUERY_NODE_CREATE_STREAM_STMT == nodeType(pCurrStmt)) {
return getPrecisionFromCurrStmt(((SCreateStreamStmt*)pCurrStmt)->pQuery, defaultVal);
}
return defaultVal; return defaultVal;
} }
...@@ -5512,16 +5515,6 @@ static bool crossTableWithUdaf(SSelectStmt* pSelect) { ...@@ -5512,16 +5515,6 @@ static bool crossTableWithUdaf(SSelectStmt* pSelect) {
} }
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
if (NULL != pStmt->pOptions->pWatermark &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
return pCxt->errCode;
}
if (NULL != pStmt->pOptions->pDelay &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pDelay))) {
return pCxt->errCode;
}
if (NULL == pStmt->pQuery) { if (NULL == pStmt->pQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5714,6 +5707,17 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt ...@@ -5714,6 +5707,17 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
return code; return code;
} }
static int32_t translateStreamOptions(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
pCxt->pCurrStmt = (SNode*)pStmt;
SStreamOptions* pOptions = pStmt->pOptions;
if ((NULL != pOptions->pWatermark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pWatermark))) ||
(NULL != pOptions->pDeleteMark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pDeleteMark))) ||
(NULL != pOptions->pDelay && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pDelay)))) {
return pCxt->errCode;
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists; pReq->igExists = pStmt->ignoreExists;
...@@ -5735,10 +5739,16 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* ...@@ -5735,10 +5739,16 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
} }
} }
if (TSDB_CODE_SUCCESS == code) {
code = translateStreamOptions(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pReq->triggerType = pStmt->pOptions->triggerType; pReq->triggerType = pStmt->pOptions->triggerType;
pReq->maxDelay = (NULL != pStmt->pOptions->pDelay ? ((SValueNode*)pStmt->pOptions->pDelay)->datum.i : 0); pReq->maxDelay = (NULL != pStmt->pOptions->pDelay ? ((SValueNode*)pStmt->pOptions->pDelay)->datum.i : 0);
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->deleteMark =
(NULL != pStmt->pOptions->pDeleteMark ? ((SValueNode*)pStmt->pOptions->pDeleteMark)->datum.i : 0);
pReq->fillHistory = pStmt->pOptions->fillHistory; pReq->fillHistory = pStmt->pOptions->fillHistory;
pReq->igExpired = pStmt->pOptions->ignoreExpired; pReq->igExpired = pStmt->pOptions->ignoreExpired;
pReq->igUpdate = pStmt->pOptions->ignoreUpdate; pReq->igUpdate = pStmt->pOptions->ignoreUpdate;
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -258,7 +258,6 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { ...@@ -258,7 +258,6 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
int32_t streamLoadTasks(SStreamMeta* pMeta) { int32_t streamLoadTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL; TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
ASSERT(0);
return -1; return -1;
} }
......
...@@ -107,8 +107,6 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, ...@@ -107,8 +107,6 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
} }
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
szPage = szPage < 0 ? 4096 : szPage;
pages = pages < 0 ? 256 : pages;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) { if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -128,6 +126,28 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -128,6 +126,28 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset(statePath, 0, 1024); memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024); tstrncpy(statePath, path, 1024);
} }
char cfgPath[1030];
sprintf(cfgPath, "%s/cfg", statePath);
char cfg[1024];
memset(cfg, 0, 1024);
TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
if (pCfgFile != NULL) {
int64_t size;
taosFStatFile(pCfgFile, &size, NULL);
taosReadFile(pCfgFile, cfg, size);
sscanf(cfg, "%d\n%d\n", &szPage, &pages);
} else {
taosMulModeMkDir(statePath, 0755);
pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
szPage = szPage < 0 ? 4096 : szPage;
pages = pages < 0 ? 256 : pages;
sprintf(cfg, "%d\n%d\n", szPage, pages);
taosWriteFile(pCfgFile, cfg, strlen(cfg));
}
taosCloseFile(&pCfgFile);
if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) { if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
goto _err; goto _err;
} }
...@@ -879,4 +899,47 @@ char* streamStateSessionDump(SStreamState* pState) { ...@@ -879,4 +899,47 @@ char* streamStateSessionDump(SStreamState* pState) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return dumpBuf; return dumpBuf;
} }
char* streamStateIntervalDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
tdbTbcMoveToFirst(pCur->pCur);
SWinKey key = {0};
void* buf = NULL;
int32_t bufSize = 0;
int32_t code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
if (code != 0) {
streamStateFreeCur(pCur);
return NULL;
}
int32_t size = 2048;
char* dumpBuf = taosMemoryCalloc(size, 1);
int64_t len = 0;
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
// len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
while (1) {
tdbTbcMoveToNext(pCur->pCur);
key = (SWinKey){0};
code = streamStateGetKVByCur(pCur, &key, NULL, 0);
if (code != 0) {
streamStateFreeCur(pCur);
return dumpBuf;
}
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
// len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
}
streamStateFreeCur(pCur);
return dumpBuf;
}
#endif #endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册