提交 18afc81a 编写于 作者: H Hongze Cheng

finish delete func

上级 b99fd131
...@@ -2972,6 +2972,17 @@ typedef struct { ...@@ -2972,6 +2972,17 @@ typedef struct {
int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq); int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq);
int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq); int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq);
typedef struct SDeleteRes {
uint64_t suid;
SArray* uidList;
int64_t skey;
int64_t ekey;
int64_t affectedRows;
} SDeleteRes;
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -20,9 +20,9 @@ ...@@ -20,9 +20,9 @@
extern "C" { extern "C" {
#endif #endif
#include "executor.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "trpc.h" #include "trpc.h"
#include "executor.h"
enum { enum {
NODE_TYPE_VNODE = 1, NODE_TYPE_VNODE = 1,
...@@ -31,14 +31,6 @@ enum { ...@@ -31,14 +31,6 @@ enum {
NODE_TYPE_MNODE, NODE_TYPE_MNODE,
}; };
typedef struct SDeleteRes {
uint64_t suid;
SArray* uidList;
int64_t skey;
int64_t ekey;
int64_t affectedRows;
} SDeleteRes;
typedef struct SQWorkerCfg { typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum; uint32_t maxSchedulerNum;
uint32_t maxTaskNum; uint32_t maxTaskNum;
...@@ -47,19 +39,19 @@ typedef struct SQWorkerCfg { ...@@ -47,19 +39,19 @@ typedef struct SQWorkerCfg {
typedef struct { typedef struct {
uint64_t cacheDataSize; uint64_t cacheDataSize;
uint64_t queryProcessed; uint64_t queryProcessed;
uint64_t cqueryProcessed; uint64_t cqueryProcessed;
uint64_t fetchProcessed; uint64_t fetchProcessed;
uint64_t dropProcessed; uint64_t dropProcessed;
uint64_t hbProcessed; uint64_t hbProcessed;
uint64_t deleteProcessed; uint64_t deleteProcessed;
uint64_t numOfQueryInQueue; uint64_t numOfQueryInQueue;
uint64_t numOfFetchInQueue; uint64_t numOfFetchInQueue;
uint64_t timeInQueryQueue; uint64_t timeInQueryQueue;
uint64_t timeInFetchQueue; uint64_t timeInFetchQueue;
uint64_t numOfErrors; uint64_t numOfErrors;
} SQWorkerStat; } SQWorkerStat;
......
...@@ -2300,7 +2300,6 @@ int32_t tDeserializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp ...@@ -2300,7 +2300,6 @@ int32_t tDeserializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp
return 0; return 0;
} }
int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) { int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -2387,7 +2386,6 @@ int32_t tDeserializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp ...@@ -2387,7 +2386,6 @@ int32_t tDeserializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp
void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); } void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); }
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) { int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -2909,20 +2907,19 @@ int32_t tDeserializeSShowVariablesReq(void *buf, int32_t bufLen, SShowVariablesR ...@@ -2909,20 +2907,19 @@ int32_t tDeserializeSShowVariablesReq(void *buf, int32_t bufLen, SShowVariablesR
return 0; return 0;
} }
int32_t tEncodeSVariablesInfo(SEncoder* pEncoder, SVariablesInfo* pInfo) { int32_t tEncodeSVariablesInfo(SEncoder *pEncoder, SVariablesInfo *pInfo) {
if (tEncodeCStr(pEncoder, pInfo->name) < 0) return -1; if (tEncodeCStr(pEncoder, pInfo->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pInfo->value) < 0) return -1; if (tEncodeCStr(pEncoder, pInfo->value) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeSVariablesInfo(SDecoder* pDecoder, SVariablesInfo* pInfo) { int32_t tDecodeSVariablesInfo(SDecoder *pDecoder, SVariablesInfo *pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pInfo->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pInfo->value) < 0) return -1; if (tDecodeCStrTo(pDecoder, pInfo->value) < 0) return -1;
return 0; return 0;
} }
int32_t tSerializeSShowVariablesRsp(void *buf, int32_t bufLen, SShowVariablesRsp *pRsp) {
int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -2930,7 +2927,7 @@ int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp ...@@ -2930,7 +2927,7 @@ int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp
int32_t varNum = taosArrayGetSize(pRsp->variables); int32_t varNum = taosArrayGetSize(pRsp->variables);
if (tEncodeI32(&encoder, varNum) < 0) return -1; if (tEncodeI32(&encoder, varNum) < 0) return -1;
for (int32_t i = 0; i < varNum; ++i) { for (int32_t i = 0; i < varNum; ++i) {
SVariablesInfo* pInfo = taosArrayGet(pRsp->variables, i); SVariablesInfo *pInfo = taosArrayGet(pRsp->variables, i);
if (tEncodeSVariablesInfo(&encoder, pInfo) < 0) return -1; if (tEncodeSVariablesInfo(&encoder, pInfo) < 0) return -1;
} }
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2940,7 +2937,7 @@ int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp ...@@ -2940,7 +2937,7 @@ int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp
return tlen; return tlen;
} }
int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) { int32_t tDeserializeSShowVariablesRsp(void *buf, int32_t bufLen, SShowVariablesRsp *pRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
...@@ -2962,11 +2959,11 @@ int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesR ...@@ -2962,11 +2959,11 @@ int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesR
return 0; return 0;
} }
void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp) { void tFreeSShowVariablesRsp(SShowVariablesRsp *pRsp) {
if (NULL == pRsp) { if (NULL == pRsp) {
return; return;
} }
taosArrayDestroy(pRsp->variables); taosArrayDestroy(pRsp->variables);
} }
...@@ -5387,3 +5384,35 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { ...@@ -5387,3 +5384,35 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1; if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1;
return 0; return 0;
} }
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
int32_t nUid = taosArrayGetSize(pRes->uidList);
if (tEncodeU64(pCoder, pRes->suid) < 0) return -1;
if (tEncodeI32v(pCoder, nUid) < 0) return -1;
for (int32_t iUid = 0; iUid < nUid; iUid++) {
if (tEncodeU64(pCoder, *(uint64_t *)taosArrayGet(pRes->uidList, iUid)) < 0) return -1;
}
if (tEncodeI64(pCoder, pRes->skey) < 0) return -1;
if (tEncodeI64(pCoder, pRes->ekey) < 0) return -1;
if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1;
return 0;
}
int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
int32_t nUid;
uint64_t uid;
if (tDecodeU64(pCoder, &pRes->suid) < 0) return -1;
if (tDecodeI32v(pCoder, &nUid) < 0) return -1;
for (int32_t iUid = 0; iUid < nUid; iUid++) {
if (tDecodeU64(pCoder, &uid) < 0) return -1;
taosArrayPush(pRes->uidList, &uid);
}
if (tDecodeI64(pCoder, &pRes->skey) < 0) return -1;
if (tDecodeI64(pCoder, &pRes->ekey) < 0) return -1;
if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1;
return 0;
}
\ No newline at end of file
...@@ -25,8 +25,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -25,8 +25,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
...@@ -93,11 +93,44 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -93,11 +93,44 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
} }
} break; } break;
case TDMT_VND_DELETE: {
int32_t size;
int32_t ret;
uint8_t *pCont;
SEncoder *pCoder = &(SEncoder){0};
SDeleteRes res = {0};
SReadHandle handle = {
.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
if (code) goto _err;
// malloc and encode
tEncodeSize(tEncodeDeleteRes, &res, size, ret);
pCont = rpcMallocCont(size + sizeof(SMsgHead));
((SMsgHead *)pCont)->contLen = htonl(size + sizeof(SMsgHead));
((SMsgHead *)pCont)->vgId = htonl(TD_VID(pVnode));
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
tEncodeDeleteRes(pCoder, &res);
tEncoderClear(pCoder);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = pCont;
pMsg->contLen = size + sizeof(SMsgHead);
taosArrayDestroy(res.uidList);
} break;
default: default:
break; break;
} }
return code; return code;
_err:
vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code));
return code;
} }
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
...@@ -146,7 +179,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -146,7 +179,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_DELETE: case TDMT_VND_DELETE:
if (vnodeProcessWriteMsg(pVnode, version, pMsg, pRsp) < 0) goto _err; if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break; break;
/* TQ */ /* TQ */
case TDMT_VND_MQ_VG_CHANGE: case TDMT_VND_MQ_VG_CHANGE:
...@@ -279,22 +312,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -279,22 +312,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
} }
} }
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
vTrace("message in write queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SDeleteRes res = {0};
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) {
case TDMT_VND_DELETE:
return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
default:
vError("unknown msg type:%d in write queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
// TODO: remove the function // TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO // TODO
...@@ -854,3 +871,31 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo ...@@ -854,3 +871,31 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
// 3. reload sync // 3. reload sync
return 0; return 0;
} }
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
SDecoder *pCoder = &(SDecoder){0};
SDeleteRes *pRes = &(SDeleteRes){0};
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
if (pRes->uidList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tDecoderInit(pCoder, pReq, len);
tDecodeDeleteRes(pCoder, pRes);
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
pRes->skey, pRes->ekey);
if (code) goto _err;
}
tDecoderClear(pCoder);
taosArrayDestroy(pRes->uidList);
return code;
_err:
return code;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册