From 548684e5ac06b7e6e0ec9a5046953baa5ac53334 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 29 Nov 2022 23:39:58 +0800 Subject: [PATCH] fix(stream): delete multiple row --- include/common/tmsg.h | 55 +++++++++---------- include/common/tmsgdef.h | 4 +- source/common/src/tmsg.c | 37 ++++++------- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 13 +++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 +-- source/libs/executor/src/tfill.c | 2 +- source/libs/executor/src/timewindowoperator.c | 4 +- source/libs/stream/src/streamExec.c | 9 ++- source/libs/stream/src/streamRecover.c | 2 +- source/os/src/osSemaphore.c | 3 +- 11 files changed, 70 insertions(+), 69 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 86967bb579..80590a25c0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1592,14 +1592,14 @@ typedef struct SSubQueryMsg { int8_t explain; int8_t needFetch; uint32_t sqlLen; - char *sql; + char* sql; uint32_t msgLen; - char *msg; + char* msg; } SSubQueryMsg; -int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq); -int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq); -void tFreeSSubQueryMsg(SSubQueryMsg *pReq); +int32_t tSerializeSSubQueryMsg(void* buf, int32_t bufLen, SSubQueryMsg* pReq); +int32_t tDeserializeSSubQueryMsg(void* buf, int32_t bufLen, SSubQueryMsg* pReq); +void tFreeSSubQueryMsg(SSubQueryMsg* pReq); typedef struct { SMsgHead header; @@ -1638,9 +1638,8 @@ typedef struct { int32_t execId; } SResFetchReq; -int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq); -int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq); - +int32_t tSerializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq); +int32_t tDeserializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq); typedef struct { SMsgHead header; @@ -1713,12 +1712,11 @@ typedef struct { int32_t execId; } STaskDropReq; -int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); -int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); - -int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); -int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); +int32_t tSerializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq); +int32_t tDeserializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq); +int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); +int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); typedef struct { int32_t code; @@ -2923,9 +2921,8 @@ typedef struct { STqOffsetVal reqOffset; } SMqPollReq; -int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq); -int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq); - +int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); +int32_t tDeserializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); typedef struct { int32_t vgId; @@ -3138,7 +3135,8 @@ int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); typedef struct { // int64_t uid; char tbname[TSDB_TABLE_NAME_LEN]; - int64_t ts; + int64_t startTs; + int64_t endTs; } SSingleDeleteReq; int32_t tEncodeSSingleDeleteReq(SEncoder* pCoder, const SSingleDeleteReq* pReq); @@ -3160,8 +3158,8 @@ typedef struct { } SBatchMsg; typedef struct { - SMsgHead header; - SArray* pMsgs; //SArray + SMsgHead header; + SArray* pMsgs; // SArray } SBatchReq; typedef struct { @@ -3173,11 +3171,11 @@ typedef struct { } SBatchRspMsg; typedef struct { - SArray* pRsps; //SArray + SArray* pRsps; // SArray } SBatchRsp; -int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq); -int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq); +int32_t tSerializeSBatchReq(void* buf, int32_t bufLen, SBatchReq* pReq); +int32_t tDeserializeSBatchReq(void* buf, int32_t bufLen, SBatchReq* pReq); static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) { if (NULL == msg) { return; @@ -3186,8 +3184,8 @@ static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) { taosMemoryFree(pMsg->msg); } -int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp); -int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp); +int32_t tSerializeSBatchRsp(void* buf, int32_t bufLen, SBatchRsp* pRsp); +int32_t tDeserializeSBatchRsp(void* buf, int32_t bufLen, SBatchRsp* pRsp); static FORCE_INLINE void tFreeSBatchRspMsg(void* p) { if (NULL == p) { @@ -3198,11 +3196,10 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) { taosMemoryFree(pRsp->msg); } -int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq); -int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq); -int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq); -int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq); - +int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); +int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); +int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); +int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); #pragma pack(pop) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 7ca552f179..e80766d249 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -280,8 +280,8 @@ enum { TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cd97ceaae1..c7e98415d1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4496,7 +4496,7 @@ int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) { if (num <= 0) { pReq->pMsgs = NULL; tEndDecode(&decoder); - + tDecoderClear(&decoder); return 0; } @@ -4511,7 +4511,7 @@ int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) { if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1; if (NULL == taosArrayPush(pReq->pMsgs, &msg)) return -1; } - + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4553,7 +4553,7 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) { if (num <= 0) { pRsp->pRsps = NULL; tEndDecode(&decoder); - + tDecoderClear(&decoder); return 0; } @@ -4569,14 +4569,13 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) { if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1; if (NULL == taosArrayPush(pRsp->pRsps, &msg)) return -1; } - + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; } - int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -4603,7 +4602,7 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1; - + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4634,7 +4633,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; - + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4664,7 +4663,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1; - if (tEncodeBinary(&encoder, (uint8_t*)pReq->msg, pReq->msgLen) < 0) return -1; + if (tEncodeBinary(&encoder, (uint8_t *)pReq->msg, pReq->msgLen) < 0) return -1; tEndEncode(&encoder); @@ -4704,8 +4703,8 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1; if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1; - if (tDecodeBinaryAlloc(&decoder, (void**)&pReq->msg, NULL) < 0) return -1; - + if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, NULL) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4721,7 +4720,6 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { taosMemoryFreeClear(pReq->msg); } - int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -4768,14 +4766,13 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; - + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; } - int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) { if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; @@ -4846,14 +4843,13 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1; if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1; - + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; } - int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -4902,7 +4898,7 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; - + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4939,14 +4935,13 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1; - + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; } - int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -6645,13 +6640,15 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) { int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->endTs) < 0) return -1; return 0; } int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) { if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->endTs) < 0) return -1; return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 61f027039d..3d9ebec4c9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1127,7 +1127,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { SRpcMsg rpcMsg = { .code = 0, .contLen = len, - .msgType = TDMT_VND_STREAM_RECOVER_STEP2, + .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq, }; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a4bfb6c876..5907be576a 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -21,14 +21,16 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl SBatchDeleteReq* deleteReq) { ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT); int32_t totRow = pDataBlock->info.rows; - SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); tqDebug("stream delete msg: row %d", totRow); for (int32_t row = 0; row < totRow; row++) { - int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); + int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row); + int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); char* name; void* varTbName = NULL; @@ -42,8 +44,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl } else { name = buildCtbNameByGroupId(stbFullName, groupId); } - tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId, - name, ts); + tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64, + pVnode->config.vgId, groupId, name, startTs, endTs); #if 0 SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); @@ -59,7 +61,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl taosMemoryFree(name); #endif SSingleDeleteReq req = { - .ts = ts, + .startTs = startTs, + .endTs = endTs, }; strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN); taosMemoryFree(name); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c75d1ffded..2d87bb3b07 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -263,7 +263,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp goto _err; } } break; - case TDMT_VND_STREAM_RECOVER_STEP2: { + case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: { if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } @@ -402,7 +402,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_RECOVER_STEP1: + case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE: return tqProcessTaskRecover1Req(pVnode->pTq, pMsg); case TDMT_STREAM_RECOVER_FINISH: return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); @@ -1184,11 +1184,11 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void int64_t uid = mr.me.uid; - int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->ts, pOneReq->ts); + int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); if (code < 0) { terrno = code; vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64, - TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->ts, pOneReq->ts); + TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); } tDecoderClear(&mr.coder); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 68228f2689..474a067f9e 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1313,8 +1313,8 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_ char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); + tdbFree(tbname); } - tdbFree(tbname); pBlock->info.rows++; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a6a477a9e3..d598fd09ca 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3117,7 +3117,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData if (!winInfo.pOutputBuf) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - + code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput, pOperator); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { @@ -3242,8 +3242,8 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); + tdbFree(tbname); } - tdbFree(tbname); pBlock->info.rows += 1; } if ((*Ite) == NULL) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 009f7eec9a..6a83a9a4da 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,7 +16,8 @@ #include "streamInc.h" static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { - void* exec = pTask->exec.executor; + int32_t code; + void* exec = pTask->exec.executor; // set input const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; @@ -49,8 +50,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* while (1) { SSDataBlock* output = NULL; uint64_t ts = 0; - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(false); + if ((code = qExecTask(exec, &output, &ts)) < 0) { + /*ASSERT(false);*/ + qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, + terrstr()); } if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 2a2784afea..7eee95a580 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -36,7 +36,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, - .msgType = TDMT_VND_STREAM_RECOVER_STEP1, + .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, }; if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 53d8dad226..bfce8b3151 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -76,7 +76,8 @@ int32_t tsem_wait(tsem_t* sem) { } int32_t tsem_timewait(tsem_t* sem, int64_t milis) { - return tsem_wait(sem); + return 0; + /*return tsem_wait(sem);*/ #if 0 struct timespec ts; timespec_get(&ts); -- GitLab