提交 f1ee1abd 编写于 作者: D dapan1121

delete data

上级 e6c15da7
...@@ -933,6 +933,7 @@ typedef struct { ...@@ -933,6 +933,7 @@ typedef struct {
int64_t numOfProcessedFetch; int64_t numOfProcessedFetch;
int64_t numOfProcessedDrop; int64_t numOfProcessedDrop;
int64_t numOfProcessedHb; int64_t numOfProcessedHb;
int64_t numOfProcessedDelete;
int64_t cacheDataSize; int64_t cacheDataSize;
int64_t numOfQueryInQueue; int64_t numOfQueryInQueue;
int64_t numOfFetchInQueue; int64_t numOfFetchInQueue;
...@@ -2689,20 +2690,20 @@ int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq); ...@@ -2689,20 +2690,20 @@ int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq); int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
typedef struct { typedef struct {
int64_t delUid; SMsgHead header;
int64_t tbUid; // super/child/normal table uint64_t sId;
int8_t type; // table type uint64_t queryId;
int16_t nWnds; uint64_t taskId;
char* tbFullName; uint32_t sqlLen;
char* subPlan; uint32_t phyLen;
STimeWindow wnds[]; char* sql;
char* msg;
} SVDeleteReq; } SVDeleteReq;
int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq); int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq); int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
typedef struct { typedef struct {
int32_t code;
int64_t affectedRows; int64_t affectedRows;
} SVDeleteRsp; } SVDeleteRsp;
......
...@@ -47,6 +47,7 @@ typedef struct { ...@@ -47,6 +47,7 @@ typedef struct {
uint64_t fetchProcessed; uint64_t fetchProcessed;
uint64_t dropProcessed; uint64_t dropProcessed;
uint64_t hbProcessed; uint64_t hbProcessed;
uint64_t deleteProcessed;
uint64_t numOfQueryInQueue; uint64_t numOfQueryInQueue;
uint64_t numOfFetchInQueue; uint64_t numOfFetchInQueue;
......
...@@ -933,6 +933,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -933,6 +933,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDelete) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1;
...@@ -1002,6 +1003,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -1002,6 +1003,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDelete) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1;
...@@ -3814,39 +3816,64 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq) ...@@ -3814,39 +3816,64 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq)
return 0; return 0;
} }
int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) { int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1; int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1; SEncoder encoder = {0};
if (tEncodeI64(pCoder, pReq->tbUid) < 0) return -1; tEncoderInit(&encoder, buf, bufLen);
if (tEncodeI8(pCoder, pReq->type) < 0) return -1;
if (tEncodeI16v(pCoder, pReq->nWnds) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->tbFullName) < 0) return -1; if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->subPlan) < 0) return -1; if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
for (int16_t i = 0; i < pReq->nWnds; ++i) { if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->wnds[i].skey) < 0) return -1; if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
if (tEncodeI64(pCoder, pReq->wnds[i].ekey) < 0) return -1; if (tEncodeU32(&encoder, pReq->phyLen) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->msg) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId);
pHead->contLen = htonl(tlen + headLen);
} }
tEndEncode(pCoder); return tlen + headLen;
return 0;
} }
int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) { int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1; int32_t headLen = sizeof(SMsgHead);
if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1; SMsgHead *pHead = buf;
if (tDecodeI64(pCoder, &pReq->tbUid) < 0) return -1; pHead->vgId = pReq->header.vgId;
if (tDecodeI8(pCoder, &pReq->type) < 0) return -1; pHead->contLen = pReq->header.contLen;
if (tDecodeI16v(pCoder, &pReq->nWnds) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->tbFullName) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->subPlan) < 0) return -1;
for (int16_t i = 0; i < pReq->nWnds; ++i) {
if (tDecodeI64(pCoder, &pReq->wnds[i].skey) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->wnds[i].ekey) < 0) return -1;
}
tEndDecode(pCoder); SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->phyLen) < 0) return -1;
pReq->sql = taosMemoryCalloc(1, pReq->sqlLen + 1);
if (NULL == pReq->sql) return -1;
pReq->msg = taosMemoryCalloc(1, pReq->phyLen + 1);
if (NULL == pReq->msg) return -1;
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->msg) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0; return 0;
} }
......
...@@ -358,6 +358,7 @@ SArray *vmGetMsgHandles() { ...@@ -358,6 +358,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -59,6 +59,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { ...@@ -59,6 +59,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
pLoad->numOfProcessedFetch = stat.fetchProcessed; pLoad->numOfProcessedFetch = stat.fetchProcessed;
pLoad->numOfProcessedDrop = stat.dropProcessed; pLoad->numOfProcessedDrop = stat.dropProcessed;
pLoad->numOfProcessedHb = stat.hbProcessed; pLoad->numOfProcessedHb = stat.hbProcessed;
pLoad->numOfProcessedDelete = stat.deleteProcessed;
return 0; return 0;
} }
......
...@@ -23,6 +23,7 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, i ...@@ -23,6 +23,7 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, i
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
...@@ -141,6 +142,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -141,6 +142,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_DELETE:
if (vnodeProcessFetchMsg(pVnode, pMsg, pRsp) < 0) goto _err;
break;
/* TQ */ /* TQ */
case TDMT_VND_MQ_VG_CHANGE: case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
...@@ -252,6 +256,19 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -252,6 +256,19 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
} }
} }
int vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
vTrace("message in write queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_VND_DELETE:
return qWorkerProcessDeleteMsg(pVnode, pVnode->pQuery, pMsg, pRsp);
default:
vError("unknown msg type:%d in write queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
// TODO: remove the function // TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO // TODO
......
...@@ -569,6 +569,7 @@ int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) { ...@@ -569,6 +569,7 @@ int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) {
if (tEncodeI64(&encoder, pInfo->numOfProcessedFetch) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfProcessedFetch) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->numOfProcessedDrop) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfProcessedDrop) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->numOfProcessedHb) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfProcessedHb) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->numOfProcessedDelete) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->cacheDataSize) < 0) return -1; if (tEncodeI64(&encoder, pInfo->cacheDataSize) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->numOfQueryInQueue) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfQueryInQueue) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->numOfFetchInQueue) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfFetchInQueue) < 0) return -1;
...@@ -591,6 +592,7 @@ int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) { ...@@ -591,6 +592,7 @@ int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) {
if (tDecodeI64(&decoder, &pInfo->numOfProcessedFetch) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfProcessedFetch) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->numOfProcessedDrop) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfProcessedDrop) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->numOfProcessedHb) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfProcessedHb) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->numOfProcessedDelete) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->cacheDataSize) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->cacheDataSize) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->numOfQueryInQueue) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfQueryInQueue) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->numOfFetchInQueue) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfFetchInQueue) < 0) return -1;
......
...@@ -160,6 +160,7 @@ typedef struct SQWMsgStat { ...@@ -160,6 +160,7 @@ typedef struct SQWMsgStat {
uint64_t cancelProcessed; uint64_t cancelProcessed;
uint64_t dropProcessed; uint64_t dropProcessed;
uint64_t hbProcessed; uint64_t hbProcessed;
uint64_t deleteProcessed;
} SQWMsgStat; } SQWMsgStat;
typedef struct SQWRTStat { typedef struct SQWRTStat {
...@@ -357,6 +358,7 @@ int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); ...@@ -357,6 +358,7 @@ int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SArray* pExpiredSch); void qwClearExpiredSch(SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwDbgDumpMgmtInfo(SQWorker *mgmt); void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
......
...@@ -300,13 +300,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int ...@@ -300,13 +300,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = msg->sId;
msg->queryId = msg->queryId;
msg->taskId = msg->taskId;
msg->refId = msg->refId;
msg->phyLen = msg->phyLen;
msg->sqlLen = msg->sqlLen;
uint64_t sId = msg->sId; uint64_t sId = msg->sId;
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
...@@ -523,3 +516,37 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ ...@@ -523,3 +516,37 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == pRsp) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
SVDeleteReq req = {0};
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);
tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
uint64_t sId = req.sId;
uint64_t qId = req.queryId;
uint64_t tId = req.taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
taosMemoryFreeClear(req.sql);
QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRsp));
QW_SCH_TASK_DLOG("processDelete end, node:%p", node);
_return:
QW_RET(code);
}
...@@ -290,8 +290,9 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -290,8 +290,9 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
QW_RET(code); QW_RET(code);
} }
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
ctx->ctrlConnInfo.handle = NULL; ctx->ctrlConnInfo.handle = NULL;
ctx->ctrlConnInfo.refId = -1; ctx->ctrlConnInfo.refId = -1;
...@@ -333,7 +334,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { ...@@ -333,7 +334,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} }
qwFreeTask(QW_FPARAMS(), &octx); qwFreeTaskCtx(QW_FPARAMS(), &octx);
QW_TASK_DLOG_E("task ctx dropped"); QW_TASK_DLOG_E("task ctx dropped");
......
...@@ -183,7 +183,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) ...@@ -183,7 +183,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
int32_t len = 0; int32_t len = 0;
SRetrieveTableRsp *rsp = NULL; SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false; bool queryEnd = false;
...@@ -242,6 +242,49 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void ...@@ -242,6 +242,49 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg) {
int32_t len = 0;
SVDeleteRsp rsp = {0};
bool queryEnd = false;
int32_t code = 0;
SOutputData output = {0};
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len <= 0 || len != sizeof(SVDeleteRsp)) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
output.pData = taosMemoryCalloc(1, len);
if (NULL == output.pData) {
QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
taosMemoryFree(output.pData);
QW_ERR_RET(code);
}
rsp.affectedRows = *(int64_t*)output.pData;
int32_t len;
int32_t ret = 0;
SEncoder coder = {0};
tEncodeSize(tEncodeSVDeleteRsp, &rsp, len, ret);
void *msg = taosMemoryCalloc(1, len);
tEncoderInit(&coder, msg, len);
tEncodeSVDeleteRsp(&coder, &rsp);
tEncoderClear(&coder);
*rspMsg = msg;
*dataLen = len;
return TSDB_CODE_SUCCESS;
}
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
...@@ -547,7 +590,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -547,7 +590,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0}; SOutputData sOutput = {0};
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus)); QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
...@@ -620,7 +663,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -620,7 +663,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
SOutputData sOutput = {0}; SOutputData sOutput = {0};
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) { if (NULL == rsp) {
ctx->dataConnInfo = qwMsg->connInfo; ctx->dataConnInfo = qwMsg->connInfo;
...@@ -875,6 +918,47 @@ _return: ...@@ -875,6 +918,47 @@ _return:
qwRelease(refId); qwRelease(refId);
} }
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp) {
int32_t code = 0;
SSubplan *plan = NULL;
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx ctx = {0};
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}
ctx->plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}
if (NULL == sinkHandle || NULL == pTaskInfo) {
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
ctx->taskHandle = pTaskInfo;
ctx->sinkHandle = sinkHandle;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, &pRsp->contLen, &pRsp->pCont));
_return:
qwFreeTaskCtx(QW_FPARAMS(), &ctx);
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) { if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
...@@ -1007,6 +1091,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt ...@@ -1007,6 +1091,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed); pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed); pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed); pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed);
pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE); pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE); pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
......
...@@ -227,6 +227,25 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -227,6 +227,25 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break; break;
} }
case TDMT_VND_DELETE_RSP: {
SCH_ERR_JRET(rspCode);
if (msg) {
SDecoder coder = {0};
SVDeleteRsp rsp = {0};
tDecoderInit(&coder, msg, msgSize);
tDecodeSVDeleteRsp(&coder, &rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("delete succeed, affectedRows:%d", rsp->affectedRows);
}
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
case TDMT_VND_QUERY_RSP: { case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg; SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
...@@ -982,6 +1001,27 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -982,6 +1001,27 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break; break;
} }
case TDMT_VND_DELETE: {
SVDeleteReq req = {0};
req.sId = schMgmt.sId;
req.queryId = pJob->queryId;
req.taskId = pTask->taskId;
req.phyLen = pTask->msgLen;
req.sqlLen = strlen(pJob->sql);
req.sql = pJob->sql;
req.msg = pTask->msg;
int32_t len = tSerializeSVDeleteReq(NULL, 0, &req);
msg = taosMemoryCalloc(1, len);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", len);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tSerializeSVDeleteReq(msg, len, &req);
SVDeleteReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
break;
}
case TDMT_VND_QUERY: { case TDMT_VND_QUERY: {
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx)); SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册