提交 548684e5 编写于 作者: L Liu Jicong

fix(stream): delete multiple row

上级 60b3d00a
...@@ -1592,14 +1592,14 @@ typedef struct SSubQueryMsg { ...@@ -1592,14 +1592,14 @@ typedef struct SSubQueryMsg {
int8_t explain; int8_t explain;
int8_t needFetch; int8_t needFetch;
uint32_t sqlLen; uint32_t sqlLen;
char *sql; char* sql;
uint32_t msgLen; uint32_t msgLen;
char *msg; char* msg;
} SSubQueryMsg; } SSubQueryMsg;
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq); int32_t tSerializeSSubQueryMsg(void* buf, int32_t bufLen, SSubQueryMsg* pReq);
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq); int32_t tDeserializeSSubQueryMsg(void* buf, int32_t bufLen, SSubQueryMsg* pReq);
void tFreeSSubQueryMsg(SSubQueryMsg *pReq); void tFreeSSubQueryMsg(SSubQueryMsg* pReq);
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
...@@ -1638,9 +1638,8 @@ typedef struct { ...@@ -1638,9 +1638,8 @@ typedef struct {
int32_t execId; int32_t execId;
} SResFetchReq; } SResFetchReq;
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq); int32_t tSerializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);
int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq); int32_t tDeserializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
...@@ -1713,12 +1712,11 @@ typedef struct { ...@@ -1713,12 +1712,11 @@ typedef struct {
int32_t execId; int32_t execId;
} STaskDropReq; } STaskDropReq;
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); int32_t tSerializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); int32_t tDeserializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
typedef struct { typedef struct {
int32_t code; int32_t code;
...@@ -2923,9 +2921,8 @@ typedef struct { ...@@ -2923,9 +2921,8 @@ typedef struct {
STqOffsetVal reqOffset; STqOffsetVal reqOffset;
} SMqPollReq; } SMqPollReq;
int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq); int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq); int32_t tDeserializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
...@@ -3138,7 +3135,8 @@ int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); ...@@ -3138,7 +3135,8 @@ int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
typedef struct { typedef struct {
// int64_t uid; // int64_t uid;
char tbname[TSDB_TABLE_NAME_LEN]; char tbname[TSDB_TABLE_NAME_LEN];
int64_t ts; int64_t startTs;
int64_t endTs;
} SSingleDeleteReq; } SSingleDeleteReq;
int32_t tEncodeSSingleDeleteReq(SEncoder* pCoder, const SSingleDeleteReq* pReq); int32_t tEncodeSSingleDeleteReq(SEncoder* pCoder, const SSingleDeleteReq* pReq);
...@@ -3160,8 +3158,8 @@ typedef struct { ...@@ -3160,8 +3158,8 @@ typedef struct {
} SBatchMsg; } SBatchMsg;
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
SArray* pMsgs; //SArray<SBatchMsg> SArray* pMsgs; // SArray<SBatchMsg>
} SBatchReq; } SBatchReq;
typedef struct { typedef struct {
...@@ -3173,11 +3171,11 @@ typedef struct { ...@@ -3173,11 +3171,11 @@ typedef struct {
} SBatchRspMsg; } SBatchRspMsg;
typedef struct { typedef struct {
SArray* pRsps; //SArray<SBatchRspMsg> SArray* pRsps; // SArray<SBatchRspMsg>
} SBatchRsp; } SBatchRsp;
int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq); int32_t tSerializeSBatchReq(void* buf, int32_t bufLen, SBatchReq* pReq);
int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq); int32_t tDeserializeSBatchReq(void* buf, int32_t bufLen, SBatchReq* pReq);
static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) { static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) {
if (NULL == msg) { if (NULL == msg) {
return; return;
...@@ -3186,8 +3184,8 @@ static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) { ...@@ -3186,8 +3184,8 @@ static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) {
taosMemoryFree(pMsg->msg); taosMemoryFree(pMsg->msg);
} }
int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp); int32_t tSerializeSBatchRsp(void* buf, int32_t bufLen, SBatchRsp* pRsp);
int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp); int32_t tDeserializeSBatchRsp(void* buf, int32_t bufLen, SBatchRsp* pRsp);
static FORCE_INLINE void tFreeSBatchRspMsg(void* p) { static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
if (NULL == p) { if (NULL == p) {
...@@ -3198,11 +3196,10 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) { ...@@ -3198,11 +3196,10 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
taosMemoryFree(pRsp->msg); taosMemoryFree(pRsp->msg);
} }
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq); int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq); int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq); int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
#pragma pack(pop) #pragma pack(pop)
......
...@@ -280,8 +280,8 @@ enum { ...@@ -280,8 +280,8 @@ enum {
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
......
...@@ -4496,7 +4496,7 @@ int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) { ...@@ -4496,7 +4496,7 @@ int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) {
if (num <= 0) { if (num <= 0) {
pReq->pMsgs = NULL; pReq->pMsgs = NULL;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -4511,7 +4511,7 @@ int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) { ...@@ -4511,7 +4511,7 @@ int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) {
if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1; if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1;
if (NULL == taosArrayPush(pReq->pMsgs, &msg)) return -1; if (NULL == taosArrayPush(pReq->pMsgs, &msg)) return -1;
} }
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -4553,7 +4553,7 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) { ...@@ -4553,7 +4553,7 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
if (num <= 0) { if (num <= 0) {
pRsp->pRsps = NULL; pRsp->pRsps = NULL;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
...@@ -4569,14 +4569,13 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) { ...@@ -4569,14 +4569,13 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1; if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1;
if (NULL == taosArrayPush(pRsp->pRsps, &msg)) return -1; if (NULL == taosArrayPush(pRsp->pRsps, &msg)) return -1;
} }
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -4603,7 +4602,7 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { ...@@ -4603,7 +4602,7 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -4634,7 +4633,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { ...@@ -4634,7 +4633,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -4664,7 +4663,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { ...@@ -4664,7 +4663,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1;
if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1; if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1;
if (tEncodeBinary(&encoder, (uint8_t*)pReq->msg, pReq->msgLen) < 0) return -1; if (tEncodeBinary(&encoder, (uint8_t *)pReq->msg, pReq->msgLen) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -4704,8 +4703,8 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) ...@@ -4704,8 +4703,8 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq)
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1; if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1; if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1;
if (tDecodeBinaryAlloc(&decoder, (void**)&pReq->msg, NULL) < 0) return -1; if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, NULL) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -4721,7 +4720,6 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { ...@@ -4721,7 +4720,6 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
taosMemoryFreeClear(pReq->msg); taosMemoryFreeClear(pReq->msg);
} }
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) { if (buf != NULL) {
...@@ -4768,14 +4766,13 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) ...@@ -4768,14 +4766,13 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq)
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) { int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) {
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1;
...@@ -4846,14 +4843,13 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { ...@@ -4846,14 +4843,13 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1; if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1;
if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) { if (buf != NULL) {
...@@ -4902,7 +4898,7 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) ...@@ -4902,7 +4898,7 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq)
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -4939,14 +4935,13 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR ...@@ -4939,14 +4935,13 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR
if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) { if (buf != NULL) {
...@@ -6645,13 +6640,15 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) { ...@@ -6645,13 +6640,15 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->endTs) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) { int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) {
if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1; if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->endTs) < 0) return -1;
return 0; return 0;
} }
......
...@@ -1127,7 +1127,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1127,7 +1127,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.code = 0, .code = 0,
.contLen = len, .contLen = len,
.msgType = TDMT_VND_STREAM_RECOVER_STEP2, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
.pCont = serializedReq, .pCont = serializedReq,
}; };
......
...@@ -21,14 +21,16 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -21,14 +21,16 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
SBatchDeleteReq* deleteReq) { SBatchDeleteReq* deleteReq) {
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT); ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
int32_t totRow = pDataBlock->info.rows; int32_t totRow = pDataBlock->info.rows;
SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
tqDebug("stream delete msg: row %d", totRow); tqDebug("stream delete msg: row %d", totRow);
for (int32_t row = 0; row < totRow; row++) { for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row);
int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row);
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
char* name; char* name;
void* varTbName = NULL; void* varTbName = NULL;
...@@ -42,8 +44,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -42,8 +44,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
} else { } else {
name = buildCtbNameByGroupId(stbFullName, groupId); name = buildCtbNameByGroupId(stbFullName, groupId);
} }
tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId, tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64,
name, ts); pVnode->config.vgId, groupId, name, startTs, endTs);
#if 0 #if 0
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0); metaReaderInit(&mr, pVnode->pMeta, 0);
...@@ -59,7 +61,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -59,7 +61,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
taosMemoryFree(name); taosMemoryFree(name);
#endif #endif
SSingleDeleteReq req = { SSingleDeleteReq req = {
.ts = ts, .startTs = startTs,
.endTs = endTs,
}; };
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN); strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
taosMemoryFree(name); taosMemoryFree(name);
......
...@@ -263,7 +263,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -263,7 +263,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err; goto _err;
} }
} break; } break;
case TDMT_VND_STREAM_RECOVER_STEP2: { case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err; goto _err;
} }
...@@ -402,7 +402,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -402,7 +402,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP: case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_RECOVER_STEP1: case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
return tqProcessTaskRecover1Req(pVnode->pTq, pMsg); return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH: case TDMT_STREAM_RECOVER_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
...@@ -1184,11 +1184,11 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void ...@@ -1184,11 +1184,11 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
int64_t uid = mr.me.uid; int64_t uid = mr.me.uid;
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->ts, pOneReq->ts); int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
if (code < 0) { if (code < 0) {
terrno = code; terrno = code;
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64, vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->ts, pOneReq->ts); TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
} }
tDecoderClear(&mr.coder); tDecoderClear(&mr.coder);
......
...@@ -1313,8 +1313,8 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_ ...@@ -1313,8 +1313,8 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
tdbFree(tbname);
} }
tdbFree(tbname);
pBlock->info.rows++; pBlock->info.rows++;
} }
......
...@@ -3117,7 +3117,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -3117,7 +3117,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
if (!winInfo.pOutputBuf) { if (!winInfo.pOutputBuf) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput, code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator); pOperator);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
...@@ -3242,8 +3242,8 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo ...@@ -3242,8 +3242,8 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
tdbFree(tbname);
} }
tdbFree(tbname);
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
if ((*Ite) == NULL) { if ((*Ite) == NULL) {
......
...@@ -16,7 +16,8 @@ ...@@ -16,7 +16,8 @@
#include "streamInc.h" #include "streamInc.h"
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
void* exec = pTask->exec.executor; int32_t code;
void* exec = pTask->exec.executor;
// set input // set input
const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
...@@ -49,8 +50,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -49,8 +50,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
while (1) { while (1) {
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { if ((code = qExecTask(exec, &output, &ts)) < 0) {
ASSERT(false); /*ASSERT(false);*/
qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId,
terrstr());
} }
if (output == NULL) { if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
......
...@@ -36,7 +36,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { ...@@ -36,7 +36,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.contLen = len, .contLen = len,
.pCont = serializedReq, .pCont = serializedReq,
.msgType = TDMT_VND_STREAM_RECOVER_STEP1, .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE,
}; };
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
......
...@@ -76,7 +76,8 @@ int32_t tsem_wait(tsem_t* sem) { ...@@ -76,7 +76,8 @@ int32_t tsem_wait(tsem_t* sem) {
} }
int32_t tsem_timewait(tsem_t* sem, int64_t milis) { int32_t tsem_timewait(tsem_t* sem, int64_t milis) {
return tsem_wait(sem); return 0;
/*return tsem_wait(sem);*/
#if 0 #if 0
struct timespec ts; struct timespec ts;
timespec_get(&ts); timespec_get(&ts);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册