提交 f2b96fb4 编写于 作者: dengyihao's avatar dengyihao

add checkpoint

上级 9d69b284
......@@ -72,7 +72,7 @@ typedef struct {
typedef struct {
int64_t suid;
char* qmsg; // SubPlanToString
char* qmsg; // SubPlanToString
SNode* node;
} STqExecTb;
......@@ -98,18 +98,21 @@ typedef enum tq_handle_status {
} tq_handle_status;
typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
int8_t fetchMeta;
int64_t snapshotVer;
SWalReader* pWalReader;
SWalRef* pRef;
// STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec
SRpcMsg* msg;
tq_handle_status status;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
int8_t fetchMeta;
int64_t snapshotVer;
SWalReader* pWalReader;
SWalRef* pRef;
// STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec
SRpcMsg* msg;
tq_handle_status status;
} STqHandle;
typedef struct {
int64_t snapshotVer;
} SStreamHandle;
struct STQ {
SVnode* pVnode;
......
......@@ -88,17 +88,20 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;
}
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
// tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
// tDecodeSTqHandle(&decoder, &handle);
// tDecoderClear(&decoder);
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
tdbTbcMoveToNext(pReader->pCur);
break;
} else {
tdbTbcMoveToNext(pReader->pCur);
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
code = tDecodeStreamTask(&decoder, pTask);
if (code < 0) {
tDecoderClear(&decoder);
taosMemoryFree(pTask);
goto _err;
}
tDecoderClear(&decoder);
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
......@@ -115,13 +118,12 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
handle.snapshotVer, handle.subKey, vLen);
return code;
_exit:
return code;
_err:
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code;
return 0;
}
// STqSnapWriter ========================================
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册