From 4402bcef947af1cf513468c4503b63b81d4137a7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 16 Dec 2022 08:53:33 +0800 Subject: [PATCH] enh(stream): forbid source db replica > 1 --- include/util/taoserror.h | 1 + source/client/src/clientTmq.c | 6 ++---- source/dnode/mnode/impl/src/mndStream.c | 13 +++++++++++-- source/dnode/vnode/src/tq/tq.c | 2 +- source/util/src/terror.c | 1 + 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e352cfb569..52221bdd44 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -342,6 +342,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2) #define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3) #define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4) +#define TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB TAOS_DEF_ERROR_CODE(0, 0x03F5) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4352ec69d3..d94a049722 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1035,7 +1035,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = -1; - tscDebug("call tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); + tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); @@ -1043,7 +1043,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) goto FAIL; - tscDebug("call tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); + tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); @@ -1570,7 +1570,6 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { } int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { - /*tscDebug("call poll");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -1794,7 +1793,6 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { - /*tscDebug("call poll1");*/ void* rspObj; int64_t startTime = taosGetTimestampMs(); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7ee688d220..6ada70e1b2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -164,7 +164,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { STREAM_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { - mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, terrstr()); + mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, + terrstr()); taosMemoryFreeClear(pRow); return NULL; } @@ -624,6 +625,15 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } + pDb = mndAcquireDb(pMnode, streamObj.sourceDb); + if (pDb->cfg.replications != 1) { + mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); + terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; + mndReleaseDb(pMnode, pDb); + pDb = NULL; + goto _OVER; + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); @@ -680,7 +690,6 @@ _OVER: } mndReleaseStream(pMnode, pStream); - mndReleaseDb(pMnode, pDb); tFreeSCMCreateStreamReq(&createStreamReq); tFreeStreamObj(&streamObj); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 389c8013f9..356cbd6332 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -738,7 +738,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); if (code != 0) { - tqError("cannot process tq delete req %s, since no such offset", pReq->subKey); + tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey); } if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 656d775ea4..4b9dde5059 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -281,6 +281,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily does not support source db having replica > 1") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") -- GitLab