From de939d589f181e18ff6ad5059fe9e84204fce4e2 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 29 Apr 2022 20:11:58 +0800 Subject: [PATCH] fix: memory error --- example/src/tmq.c | 2 -- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndTopic.h | 2 ++ source/dnode/mnode/impl/src/mndDef.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 10 +----- source/dnode/mnode/impl/src/mndSubscribe.c | 36 ++++++++++++++++++- source/dnode/mnode/impl/src/mndTopic.c | 36 +++++++++++++++++++ source/dnode/vnode/src/tq/tq.c | 42 ++++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 41 --------------------- source/libs/stream/src/tstream.c | 1 + 10 files changed, 119 insertions(+), 54 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index abd4f78610..976d658fa6 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -14,9 +14,7 @@ */ #include -#include #include -#include #include #include #include "taos.h" diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 118fbf1a03..a2c1d54cab 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -517,6 +517,7 @@ void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp); typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; SRWLatch lock; + int64_t dbUid; int32_t vgNum; int8_t subType; int8_t withTbName; diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index fd82c60d37..e3174a90a2 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -31,6 +31,8 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic); SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic); SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); +int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 03ac74aa62..6ff295ec7d 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -241,7 +241,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) { buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp)); + buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); #if 0 int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ec8dfa4d47..f0a93b0040 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -731,7 +731,6 @@ _OVER: static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; int32_t code = -1; - SStbObj *pTopicStb = NULL; SStbObj *pStb = NULL; SDbObj *pDb = NULL; SUserObj *pUser = NULL; @@ -762,12 +761,6 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) { goto _OVER; } - pTopicStb = mndAcquireStb(pMnode, createReq.name); - if (pTopicStb != NULL) { - terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC; - goto _OVER; - } - pDb = mndAcquireDbByStb(pMnode, createReq.name); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -785,7 +778,7 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) { int32_t numOfStbs = -1; mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs); - if (pDb->cfg.numOfStables == 1 && numOfStbs != 0 ) { + if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) { terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB; goto _OVER; } @@ -799,7 +792,6 @@ _OVER: } mndReleaseStb(pMnode, pStb); - mndReleaseStb(pMnode, pTopicStb); mndReleaseDb(pMnode, pDb); mndReleaseUser(pMnode, pUser); tFreeSMCreateStbReq(&createReq); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 13ee26b6cd..bcec73ed30 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -80,6 +80,7 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pSub->dbUid = pTopic->dbUid; pSub->subType = pTopic->subType; pSub->withTbName = pTopic->withTbName; pSub->withSchema = pTopic->withSchema; @@ -593,7 +594,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t tlen; SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); - buf = taosMemoryMalloc(tlen + 1); + buf = taosMemoryMalloc(tlen); if (buf == NULL) goto SUB_DECODE_OVER; SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); @@ -679,3 +680,36 @@ static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } + +static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { + SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + +int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqSubscribeObj *pSub = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); + if (pIter == NULL) break; + + if (pSub->dbUid != pDb->uid) { + sdbRelease(pSdb, pSub); + continue; + } + + if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { + goto END; + } + } + + code = 0; +END: + return code; +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 731ee69105..0a8d1cee4a 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -38,6 +38,8 @@ static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); +static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); + int32_t mndInitTopic(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_TOPIC, .keyType = SDB_KEY_BINARY, @@ -553,7 +555,41 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB return numOfRows; } +static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) { + SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqTopicObj *pTopic = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + if (pTopic->dbUid != pDb->uid) { + sdbRelease(pSdb, pTopic); + continue; + } + + if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) { + goto END; + } + } + + code = 0; +END: + return code; +} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a39d959b36..411d5c451c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -58,6 +58,48 @@ void tqClose(STQ* pTq) { // TODO } +static void tdSRowDemo() { +#define DEMO_N_COLS 3 + + int16_t schemaVersion = 0; + int32_t numOfCols = DEMO_N_COLS; // ts + int + SRowBuilder rb = {0}; + + SSchema schema[DEMO_N_COLS] = { + {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = SCHEMA_SMA_ON}, + {.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = SCHEMA_SMA_ON}, + {.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = SCHEMA_SMA_ON}}; + + SSchema* pSchema = schema; + STSchema* pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols); + + tdSRowInit(&rb, schemaVersion); + tdSRowSetTpInfo(&rb, numOfCols, pTSChema->flen); + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSChema); + void* row = taosMemoryCalloc(1, maxLen); // make sure the buffer is enough + + // set row buf + tdSRowResetBuf(&rb, row); + + for (int32_t idx = 0; idx < pTSChema->numOfCols; ++idx) { + STColumn* pColumn = pTSChema->columns + idx; + if (idx == 0) { + int64_t tsKey = 1651234567; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &tsKey, true, pColumn->offset, idx); + } else if (idx == 1) { + int32_t val1 = 10; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &val1, true, pColumn->offset, idx); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, true, pColumn->offset, idx); + } + } + + // print + tdSRowPrint(row, pTSChema, __func__); + + taosMemoryFree(pTSChema); +} + int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType != TDMT_VND_SUBMIT) return 0; void* pIter = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ed8a978226..4eeba06027 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -45,47 +45,6 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { #endif return 0; } -static void tdSRowDemo() { -#define DEMO_N_COLS 3 - - int16_t schemaVersion = 0; - int32_t numOfCols = DEMO_N_COLS; // ts + int - SRowBuilder rb = {0}; - - SSchema schema[DEMO_N_COLS] = { - {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = SCHEMA_SMA_ON}, - {.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = SCHEMA_SMA_ON}, - {.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = SCHEMA_SMA_ON}}; - - SSchema *pSchema = schema; - STSchema *pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols); - - tdSRowInit(&rb, schemaVersion); - tdSRowSetTpInfo(&rb, numOfCols, pTSChema->flen); - int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSChema); - void *row = taosMemoryCalloc(1, maxLen); // make sure the buffer is enough - - // set row buf - tdSRowResetBuf(&rb, row); - - for (int32_t idx = 0; idx < pTSChema->numOfCols; ++idx) { - STColumn *pColumn = pTSChema->columns + idx; - if (idx == 0) { - int64_t tsKey = 1651234567; - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &tsKey, true, pColumn->offset, idx); - } else if (idx == 1) { - int32_t val1 = 10; - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &val1, true, pColumn->offset, idx); - } else { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, true, pColumn->offset, idx); - } - } - - // print - tdSRowPrint(row, pTSChema, __func__); - - taosMemoryFree(pTSChema); -} int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { void *ptr = NULL; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index b06c774ed3..cf3cd162aa 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -28,6 +28,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* if (buf == NULL) { return -1; } + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { ((SMsgHead*)buf)->vgId = 0; req.taskId = pTask->inplaceDispatcher.taskId; -- GitLab