未验证 提交 a3666964 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18596 from taosdata/feature/stream

refactor(stream): distributed checkpoint
......@@ -137,6 +137,7 @@ extern int64_t tsWalFsyncDataSizeLimit;
// internal
extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
extern int32_t tsStreamCheckpointTickInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval;
......
......@@ -1148,6 +1148,13 @@ typedef struct {
int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
typedef struct {
int64_t tick;
} SMStreamTickReq;
int32_t tSerializeSMStreamTickMsg(void* buf, int32_t bufLen, SMStreamTickReq* pReq);
int32_t tDeserializeSMStreamTickMsg(void* buf, int32_t bufLen, SMStreamTickReq* pReq);
typedef struct {
int32_t id;
uint16_t port; // node sync Port
......@@ -1748,6 +1755,8 @@ typedef struct {
int64_t watermark;
int32_t numOfTags;
SArray* pTags; // array of SField
// 3.0.20
int64_t checkpointFreq; // ms
} SCMCreateStreamReq;
typedef struct {
......@@ -1947,6 +1956,12 @@ typedef struct {
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
typedef struct {
int64_t streamId;
int64_t checkpointId;
char streamName[TSDB_STREAM_FNAME_LEN];
} SMStreamDoCheckpointMsg;
typedef struct {
int64_t status;
} SMVSubscribeRsp;
......
......@@ -172,6 +172,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
......@@ -241,8 +243,11 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)
......@@ -282,6 +287,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
......
......@@ -275,31 +275,6 @@ typedef struct {
SEpSet epSet;
} SStreamChildEpInfo;
typedef struct {
int32_t srcNodeId;
int32_t srcChildId;
int64_t stateSaveVer;
int64_t stateProcessedVer;
} SStreamCheckpointInfo;
typedef struct {
int64_t streamId;
int64_t checkTs;
int32_t checkpointId; // incremental
int32_t taskId;
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamMultiVgCheckpointInfo;
typedef struct {
int32_t taskId;
int32_t checkpointId; // incremental
} SStreamCheckpointKey;
typedef struct {
int32_t taskId;
SArray* checkpointVer;
} SStreamRecoveringState;
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
......@@ -364,6 +339,10 @@ typedef struct SStreamTask {
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
} SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
......@@ -509,6 +488,60 @@ typedef struct {
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
typedef struct {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int32_t nodeId;
int64_t expireTime;
} SStreamCheckpointSourceReq;
typedef struct {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int32_t nodeId;
int64_t expireTime;
} SStreamCheckpointSourceRsp;
int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
int32_t tDecodeSStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp);
typedef struct {
SMsgHead msgHead;
int64_t streamId;
int64_t checkpointId;
int32_t downstreamTaskId;
int32_t downstreamNodeId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
int64_t expireTime;
int8_t taskLevel;
} SStreamCheckpointReq;
typedef struct {
SMsgHead msgHead;
int64_t streamId;
int64_t checkpointId;
int32_t downstreamTaskId;
int32_t downstreamNodeId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
int64_t expireTime;
int8_t taskLevel;
} SStreamCheckpointRsp;
int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq);
int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq);
int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
typedef struct {
int64_t streamId;
int32_t downstreamTaskId;
......@@ -598,18 +631,22 @@ void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp);
#ifdef __cplusplus
}
#endif
......
......@@ -167,6 +167,7 @@ int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 1;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60;
......
......@@ -3748,6 +3748,31 @@ int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {
return 0;
}
int32_t tSerializeSMStreamTickMsg(void *buf, int32_t bufLen, SMStreamTickReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->tick) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMStreamTickMsg(void *buf, int32_t bufLen, SMStreamTickReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->tick) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tEncodeSReplica(SEncoder *pEncoder, SReplica *pReplica) {
if (tEncodeI32(pEncoder, pReplica->id) < 0) return -1;
if (tEncodeU16(pEncoder, pReplica->port) < 0) return -1;
......
......@@ -640,6 +640,10 @@ typedef struct {
SArray* tasks; // SArray<SArray<SStreamTask>>
SSchemaWrapper outputSchema;
SSchemaWrapper tagSchema;
// 3.0.20
int64_t checkpointFreq; // ms
int64_t currentTick; // do not serialize
} SStreamObj;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
......@@ -653,15 +657,6 @@ typedef struct {
SArray* childInfo; // SArray<SStreamChildEpInfo>
} SStreamCheckpointObj;
#if 0
typedef struct {
int64_t uid;
int64_t streamId;
int8_t status;
int8_t stage;
} SStreamRecoverObj;
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -76,6 +76,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
// 3.0.20
if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
......@@ -139,6 +142,9 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
// 3.0.20
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
......
......@@ -85,6 +85,21 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
return pReq;
}
static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) {
SMStreamTickReq timerReq = {
.tick = sec,
};
int32_t contLen = tSerializeSMStreamTickMsg(NULL, 0, &timerReq);
if (contLen <= 0) return NULL;
void *pReq = rpcMallocCont(contLen);
if (pReq == NULL) return NULL;
tSerializeSMStreamTickMsg(pReq, contLen, &timerReq);
*pContLen = contLen;
return pReq;
}
static void mndPullupTrans(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
......@@ -105,7 +120,24 @@ static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_TIMER,
.pCont = pReq,
.contLen = contLen,
};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER,
.pCont = pReq,
.contLen = contLen,
};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
......@@ -224,6 +256,12 @@ static void *mndThreadFp(void *param) {
mndCalMqRebalance(pMnode);
}
#if 0
if (sec % tsStreamCheckpointTickInterval == 0) {
mndStreamCheckpointTick(pMnode, sec);
}
#endif
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode);
}
......
......@@ -36,6 +36,8 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
......@@ -62,6 +64,10 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
......@@ -680,93 +686,183 @@ _OVER:
tFreeStreamObj(&streamObj);
return code;
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SStreamObj *pStream = NULL;
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMDropStreamReq dropReq = {0};
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
ASSERT(0);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
// iterate all stream obj
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
// incr tick
int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1);
// if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q
if (currentTick >= pStream->checkpointFreq) {
atomic_store_64(&pStream->currentTick, 0);
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->streamId = pStream->uid;
pMsg->checkpointId = tGenIdPI64();
memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN);
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT,
.pCont = pMsg,
.contLen = sizeof(SMStreamDoCheckpointMsg),
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
pStream = mndAcquireStream(pMnode, dropReq.name);
return 0;
}
if (pStream == NULL) {
if (dropReq.igNotExists) {
mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream);
return 0;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
}
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask,
SMStreamDoCheckpointMsg *pMsg) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = pMsg->checkpointId;
req.nodeId = pTask->nodeId;
req.expireTime = -1;
req.streamId = pTask->streamId;
req.taskId = pTask->taskId;
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
int32_t code;
int32_t blen;
tEncodeSize(tEncodeSStreamCheckpointSourceReq, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeSStreamCheckpointSourceReq(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pTask->nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName);
if (pStream == NULL || pStream->uid != pMsg->streamId) {
mError("start checkpointing failed since stream %s not found", pMsg->streamName);
return -1;
}
// drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
// build new transaction:
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint");
if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
taosRLockLatch(&pStream->lock);
// 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
ASSERT(pTask->nodeId > 0);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
if (pVgObj == NULL) {
ASSERT(0);
taosRUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return -1;
}
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask, pMsg) < 0) {
taosRUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
taosRUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return -1;
}
}
}
}
// 2. reset tick
atomic_store_64(&pStream->currentTick, 0);
// 3. commit log: stream checkpoint info
taosRUnLockLatch(&pStream->lock);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mError("failed to prepare trans rebalance since %s", terrstr());
mndTransDrop(pTrans);
mndReleaseStream(pMnode, pStream);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
return 0;
}
#if 0
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMRecoverStreamReq recoverReq = {0};
if (tDeserializeSMRecoverStreamReq(pReq->pCont, pReq->contLen, &recoverReq) < 0) {
SMDropStreamReq dropReq = {0};
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
ASSERT(0);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
pStream = mndAcquireStream(pMnode, recoverReq.name);
pStream = mndAcquireStream(pMnode, dropReq.name);
if (pStream == NULL) {
if (recoverReq.igNotExists) {
mInfo("stream:%s, not exist, ignore not exist is set", recoverReq.name);
if (dropReq.igNotExists) {
mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream);
return 0;
} else {
......@@ -779,39 +875,42 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (pTrans == NULL) {
mError("stream:%s, failed to recover since %s", recoverReq.name, terrstr());
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d, used to drop stream:%s", pTrans->id, recoverReq.name);
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
// broadcast to recover all tasks
if (mndRecoverStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to recover task since %s", recoverReq.name, terrstr());
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
// update stream status
if (mndSetStreamRecover(pMnode, pTrans, pStream) < 0) {
// drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare recover stream trans since %s", pTrans->id, terrstr());
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
#endif
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
......@@ -847,13 +946,6 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
}
#if 0
if (mndSetDropOffsetStreamLogs(pMnode, pTrans, pStream) < 0) {
sdbRelease(pSdb, pStream);
goto END;
}
#endif
sdbRelease(pSdb, pStream);
}
......
......@@ -440,9 +440,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "persist-reb");
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
// make txn:
// 1. redo action: action to all vg
......@@ -523,28 +523,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
}
#if 0
if (consumerNum) {
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic) {
// TODO make topic complete
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
// TODO is that correct?
pTopic->refConsumerCnt = topicObj.refConsumerCnt;
mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup,
topicObj.refConsumerCnt);
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) {
ASSERT(0);
goto REB_FAIL;
}
}
}
#endif
// 4. TODO commit log: modification log
......
......@@ -168,7 +168,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
streamMetaRemoveTask1(pSnode->pMeta, pReq->taskId);
streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
return 0;
}
......
......@@ -1425,7 +1425,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
streamMetaRemoveTask1(pTq->pStreamMeta, pReq->taskId);
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
return 0;
}
......
......@@ -17,7 +17,6 @@
#define _STREAM_INC_H_
#include "executor.h"
#include "tref.h"
#include "tstream.h"
#ifdef __cplusplus
......@@ -25,9 +24,8 @@ extern "C" {
#endif
typedef struct {
int8_t inited;
int32_t refPool;
void* timer;
int8_t inited;
void* timer;
} SStreamGlobalEnv;
static SStreamGlobalEnv streamEnv;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->nodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->taskLevel) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->taskLevel) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->taskLevel) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->taskLevel) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) {
if (pTask->checkpointingId == 0) {
pTask->checkpointingId = checkpointId;
pTask->checkpointAlignCnt = taosArrayGetSize(pTask->childEpInfo);
}
ASSERT(pTask->checkpointingId == checkpointId);
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
}
static int32_t streamDoCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) {
// commit tdb state
streamStateCommit(pTask->pState);
// commit non-tdb state
// copy and save new state
// report to mnode
// send checkpoint req to downstream
return 0;
}
static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) {
// ref wal
// set status checkpointing
// do checkpoint
return 0;
}
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
int32_t code;
int64_t checkpointId = pReq->checkpointId;
code = streamDoSourceCheckpoint(pMeta, pTask, checkpointId);
if (code < 0) {
// rsp error
return -1;
}
return 0;
}
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq) {
int32_t code;
int64_t checkpointId = pReq->checkpointId;
int32_t childId = pReq->childId;
if (taosArrayGetSize(pTask->childEpInfo) > 0) {
code = streamAlignCheckpoint(pTask, checkpointId, childId);
if (code > 0) {
return 0;
}
if (code < 0) {
ASSERT(0);
return -1;
}
}
code = streamDoCheckpoint(pMeta, pTask, checkpointId);
if (code < 0) {
// rsp error
return -1;
}
// send rsp to all children
return 0;
}
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp) {
// recover step2, scan from wal
// unref wal
// set status normal
return 0;
}
......@@ -202,7 +202,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
}
}
void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) {
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
......@@ -219,35 +219,6 @@ void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) {
}
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
/*return -1;*/
}
if (pTask->triggerParam != 0) {
taosTmrStop(pTask->timer);
}
while (1) {
int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING);
if (schedStatus != TASK_SCHED_STATUS__ACTIVE) {
tFreeSStreamTask(pTask);
break;
}
taosMsleep(10);
}
}
return 0;
}
int32_t streamMetaBegin(SStreamMeta* pMeta) {
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
......
......@@ -325,46 +325,3 @@ int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishR
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
if (tEncodeI32(pEncoder, pCheckpoint->srcNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->srcChildId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->stateProcessedVer) < 0) return -1;
return 0;
}
int32_t tDecodeSStreamCheckpointInfo(SDecoder* pDecoder, SStreamCheckpointInfo* pCheckpoint) {
if (tDecodeI32(pDecoder, &pCheckpoint->srcNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->srcChildId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->stateProcessedVer) < 0) return -1;
return 0;
}
int32_t tEncodeSStreamMultiVgCheckpointInfo(SEncoder* pEncoder, const SStreamMultiVgCheckpointInfo* pCheckpoint) {
if (tEncodeI64(pEncoder, pCheckpoint->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->checkTs) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->taskId) < 0) return -1;
int32_t sz = taosArrayGetSize(pCheckpoint->checkpointVer);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamCheckpointInfo* pOneVgCkpoint = taosArrayGet(pCheckpoint->checkpointVer, i);
if (tEncodeSStreamCheckpointInfo(pEncoder, pOneVgCkpoint) < 0) return -1;
}
return 0;
}
int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCheckpointInfo* pCheckpoint) {
if (tDecodeI64(pDecoder, &pCheckpoint->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->checkTs) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->taskId) < 0) return -1;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamCheckpointInfo oneVgCheckpoint;
if (tDecodeSStreamCheckpointInfo(pDecoder, &oneVgCheckpoint) < 0) return -1;
taosArrayPush(pCheckpoint->checkpointVer, &oneVgCheckpoint);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册