提交 875291b7 编写于 作者: dengyihao's avatar dengyihao

add checkpoint

上级 0c61b425
...@@ -69,6 +69,7 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -69,6 +69,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
...@@ -887,7 +888,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { ...@@ -887,7 +888,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mError("failed to checkpoin since stream %s not found", tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); mError("failed to checkpoint since stream %s", tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
......
...@@ -1492,7 +1492,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { ...@@ -1492,7 +1492,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId); tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }
...@@ -1500,7 +1500,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { ...@@ -1500,7 +1500,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
if (pMeta->walScanCounter > 1) { if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }
...@@ -1508,7 +1508,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { ...@@ -1508,7 +1508,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
if (pRunReq == NULL) { if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pMeta->lock);
return -1; return -1;
} }
...@@ -1519,8 +1519,17 @@ int32_t tqStartStreamTasks(STQ* pTq) { ...@@ -1519,8 +1519,17 @@ int32_t tqStartStreamTasks(STQ* pTq) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { return 0; } int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) {
\ No newline at end of file int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
taosWLockLatch(&pMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
\ No newline at end of file
...@@ -375,14 +375,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -375,14 +375,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break; break;
/* TQ */ /* TQ */
case TDMT_VND_TMQ_SUBSCRIBE: case TDMT_VND_TMQ_SUBSCRIBE:
if (tqProcessSubscribeReq(pVnode->pTq, version, pReq, len) < 0) { if (tqProcessSubscribeReq(pVnode->pTq, version, pReq, len) < 0) goto _err;
goto _err;
}
break; break;
case TDMT_VND_TMQ_DELETE_SUB: case TDMT_VND_TMQ_DELETE_SUB:
if (tqProcessDeleteSubReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { if (tqProcessDeleteSubReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) goto _err;
goto _err;
}
break; break;
case TDMT_VND_TMQ_COMMIT_OFFSET: case TDMT_VND_TMQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { if (tqProcessOffsetCommitReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
...@@ -434,6 +430,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -434,6 +430,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err; goto _err;
} }
} break; } break;
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
if (tqProcessStreamCheckPointReq(pVnode->pTq, version, pReq, len) < 0) goto _err;
break;
case TDMT_VND_ALTER_CONFIRM: case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange; needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp) < 0) { if (vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp) < 0) {
...@@ -455,6 +455,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -455,6 +455,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
case TDMT_VND_COMPACT: case TDMT_VND_COMPACT:
vnodeProcessCompactVnodeReq(pVnode, version, pReq, len, pRsp); vnodeProcessCompactVnodeReq(pVnode, version, pReq, len, pRsp);
goto _exit; goto _exit;
default: default:
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
return -1; return -1;
......
...@@ -1106,21 +1106,18 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { ...@@ -1106,21 +1106,18 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
int sLen = stateKeyEncode(&sKey, sKeyStr); int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&eKey, eKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr);
char toStringStart[128] = {0};
char toStringEnd[128] = {0};
if (qDebugFlag & DEBUG_TRACE) {
stateKeyToString(&sKey, toStringStart);
stateKeyToString(&eKey, toStringEnd);
}
char* err = NULL; char* err = NULL;
rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1],
sKeyStr, sLen, eKeyStr, eLen, &err); sKeyStr, sLen, eKeyStr, eLen, &err);
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if (err != NULL) { if (err != NULL) {
char toStringStart[128] = {0};
char toStringEnd[128] = {0};
stateKeyToString(&sKey, toStringStart);
stateKeyToString(&eKey, toStringEnd);
qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
taosMemoryFree(err); taosMemoryFree(err);
} else {
rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[1], sKeyStr, sLen, eKeyStr, eLen);
} }
return 0; return 0;
...@@ -1768,28 +1765,31 @@ _end: ...@@ -1768,28 +1765,31 @@ _end:
} }
int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
qDebug("streamStateSessionClear_rocksdb"); qDebug("streamStateSessionClear_rocksdb");
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SStateSessionKey skey = {.key.win.skey = 0, .key.win.ekey = 0, .key.groupId = 0, .opNum = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); SStateSessionKey ekey = {
.key.win.skey = INT64_MAX, .key.win.ekey = INT64_MAX, .key.groupId = UINT64_MAX, .opNum = INT64_MAX};
while (1) {
SSessionKey delKey = {0};
void* buf = NULL;
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) {
memset(buf, 0, size);
// refactor later
streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
} else {
taosMemoryFreeClear(buf);
break;
}
taosMemoryFreeClear(buf);
streamStateCurNext_rocksdb(pState, pCur); char skeyStr[128] = {0}, ekeyStr[128] = {0};
int slen = stateSessionKeyEncode(&skey, skeyStr);
int elen = stateSessionKeyEncode(&ekey, ekeyStr);
char* err = NULL;
rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[3],
skeyStr, slen, ekeyStr, elen, &err);
if (err != NULL) {
char toStringStart[128] = {0};
char toStringEnd[128] = {0};
stateSessionKeyToString(&skey, toStringStart);
stateSessionKeyToString(&ekey, toStringEnd);
qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
taosMemoryFree(err);
} else {
rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[3], skeyStr, slen, ekeyStr, elen);
} }
streamStateFreeCur(pCur);
return -1; return 0;
} }
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#if 0
#include "streamInc.h" #include "streamInc.h"
int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
...@@ -193,4 +192,3 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre ...@@ -193,4 +192,3 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre
// set status normal // set status normal
return 0; return 0;
} }
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册