diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 922cab23c89f1f0fce7b1e24eec6fdaf58255215..73f7c2bcc4115ba784ae98b2b74e46a74f8cb263 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -966,14 +966,14 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = -1; + tscDebug("call tmq subscribe, consumer: %ld, topic num %d", tmq->consumerId, sz); + req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) goto FAIL; - tscDebug("call tmq subscribe, consumer: %ld, topic num %d", tmq->consumerId, sz); - for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 511a84290dd2a311f59cfc9467ff2619d92013cb..1c8ec0a302a03bd133531bccd86ee107c593ff07 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -416,7 +416,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } SStreamTask* pTask = tNewSStreamTask(pStream->uid); - if (pInnerTask == NULL) { + if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e1578b37f1583547ed8d2738b9ff59b13505e6fb..aaa57d49edaa30a423239722890b373fb9a9e26e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1010,16 +1010,21 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) { qError("stream task input del failed, task id %d", pTask->taskId); + + taosFreeQitem(pRefBlock); continue; } + if (streamSchedExec(pTask) < 0) { qError("stream task launch failed, task id %d", pTask->taskId); continue; } + } else { streamTaskInputFail(pTask); } } + int32_t ref = atomic_sub_fetch_32(pRef, 1); ASSERT(ref >= 0); if (ref == 0) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 1e5c63fa8d1ba35ea160e93f7a3b8bf785c0aaf0..91fa49fce0777b4f382e58878bc6c20f8f233beb 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -45,6 +45,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { if (taosRemoveFile(fnameStr) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); + taosThreadMutexUnlock(&pWal->mutex); return -1; } wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr); @@ -53,6 +54,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { if (taosRemoveFile(fnameStr) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); + taosThreadMutexUnlock(&pWal->mutex); return -1; } wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);