提交 75182bb0 编写于 作者: C Cary Xu

feat: tsma exp wnd clear

上级 bac85eab
......@@ -2455,6 +2455,28 @@ int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq);
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq);
int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder* pCoder, SVGetTsmaExpWndsRsp* pReq);
typedef struct {
int64_t nKeys; // n consecutive keys since skey
int64_t skey;
} SVTsmaExpWndItem;
typedef struct {
int64_t indexUid;
int64_t version; // tsma result version
int64_t nItems;
SVTsmaExpWndItem items[];
} SVClrTsmaExpWndsReq;
typedef struct {
int64_t indexUid;
int32_t code;
} SVClrTsmaExpWndsRsp;
int32_t tEncodeSVClrTsmaExpWndsReq(SEncoder* pCoder, const SVClrTsmaExpWndsReq* pReq);
int32_t tDecodeSVClrTsmaExpWndsReq(SDecoder* pCoder, SVClrTsmaExpWndsReq* pReq);
int32_t tEncodeSVClrTsmaExpWndsRsp(SEncoder* pCoder, const SVClrTsmaExpWndsRsp* pReq);
int32_t tDecodeSVClrTsmaExpWndsRsp(SDecoder* pCoder, SVClrTsmaExpWndsRsp* pReq);
typedef struct {
int idx;
} SMCreateFullTextReq;
......
......@@ -190,6 +190,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CLR_TSMA_EXP_WNDS, "vnode-clr-tsma-expired-windows", SVClrTsmaExpWndsReq, SVClrTsmaExpWndsRsp)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
......
......@@ -2419,7 +2419,7 @@ int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pR
return 0;
}
int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo* pInfo) {
int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo *pInfo) {
if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1;
if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1;
......@@ -2440,7 +2440,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo* pInfo = (STableIndexInfo*)taosArrayGet(pRsp->pIndex, i);
STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
}
}
......@@ -2489,7 +2489,6 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
return 0;
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -3856,7 +3855,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
}
if (pSma->numOfVgroups) { // only needed in dstVgroup
if (pSma->numOfVgroups) { // only needed in dstVgroup
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
......@@ -3903,7 +3902,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
} else {
pSma->tagsFilter = NULL;
}
if (pSma->numOfVgroups > 0) { // only needed in dstVgroup
if (pSma->numOfVgroups > 0) { // only needed in dstVgroup
pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet));
if (!pSma->pVgEpSet) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -4018,6 +4017,49 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq)
return 0;
}
int32_t tEncodeSVClrTsmaExpWndsReq(SEncoder *pCoder, const SVClrTsmaExpWndsReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
if (tEncodeI64(pCoder, pReq->version) < 0) return -1;
if (tEncodeI64v(pCoder, pReq->nItems) < 0) return -1;
for (int64_t n = 0; pReq->nItems; ++n) {
if (tEncodeI64v(pCoder, pReq->items[n].nKeys) < 0) return -1;
if (tEncodeI64(pCoder, pReq->items[n].skey) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVClrTsmaExpWndsReq(SDecoder *pCoder, SVClrTsmaExpWndsReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->version) < 0) return -1;
if (tDecodeI64v(pCoder, &pReq->nItems) < 0) return -1;
for (int64_t i = 0; i < pReq->nItems; ++i) {
if (tDecodeI64v(pCoder, &pReq->items[i].nKeys) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->items[i].skey) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeSVClrTsmaExpWndsRsp(SEncoder *pCoder, const SVClrTsmaExpWndsRsp *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVClrTsmaExpWndsRsp(SDecoder *pCoder, SVClrTsmaExpWndsRsp *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......
......@@ -348,6 +348,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CLR_TSMA_EXP_WNDS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -25,6 +25,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
static int32_t vnodeProcessExpWndsClrReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
......@@ -174,7 +175,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
}
vTrace("vgId:%d, process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
#if 1
#if 0
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
......@@ -225,6 +227,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
......@@ -236,13 +239,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
......@@ -253,6 +253,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
case TDMT_VND_CLR_TSMA_EXP_WNDS:
return vnodeProcessExpWndsClrReq(pVnode, pMsg, msgLen, NULL);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......@@ -896,3 +898,42 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
return 0;
}
static int32_t vnodeProcessExpWndsClrReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVClrTsmaExpWndsReq req = {0};
SDecoder coder = {0};
if (pRsp) {
pRsp->msgType = TDMT_VND_CLR_TSMA_EXP_WNDS_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
}
// decode and process
tDecoderInit(&coder, pReq, len);
if (tDecodeSVClrTsmaExpWndsReq(&coder, &req) < 0) {
terrno = TSDB_CODE_MSG_DECODE_ERROR;
if (pRsp) pRsp->code = terrno;
goto _err;
}
ASSERT(0);
// if (tdProcess(pVnode->pSma, version, (const char *)&req) < 0) {
// if (pRsp) pRsp->code = terrno;
// goto _err;
// }
tDecoderClear(&coder);
vDebug("vgId:%d, success to process expWnds clear for tsma %" PRIi64 " version %" PRIi64, TD_VID(pVnode),
req.indexUid, req.version);
return 0;
_err:
tDecoderClear(&coder);
vError("vgId:%d, success to process expWnds clear for tsma %" PRIi64 " version %" PRIi64 " since %s", TD_VID(pVnode),
req.indexUid, req.version, terrstr());
return -1;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册