diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4e7a8ed6bd38e0973f40d858802901f61ca448c8..40e70c247ef2d8d4a1cce6db07ac9f48271a5085 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5280,8 +5280,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { char msg[512] = {0}; - if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) { - snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); + if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 2)) { + snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 039cc966f03d0ab7321c15b37690dc33a031eb39..b93ec0b6d381520c8d693f78f72f39685e13d329 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1512,6 +1512,9 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); + tfree(pState); + tfree(pSupporter); + // release data block data pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 0294120ccb718633af35de4472ec303f61faca9e..61b7ab4ec589ed2c9d9bad992d1ce6d10c60cbe8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -587,7 +587,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { assert(pCmd->numOfClause == 1); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - // set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache + // set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) { strcpy(pTableMetaInfo->name, pDataBlock->tableId); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); @@ -599,7 +599,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { /* * the submit message consists of : [RPC header|message body|digest] - * the dataBlock only includes the RPC Header buffer and actual submit messsage body, space for digest needs + * the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs * additional space. */ int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100); @@ -1277,7 +1277,7 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) { void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) { - if (src == NULL) { + if (src == NULL || src->numOfExprs == 0) { return; } @@ -1983,22 +1983,24 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void return NULL; } - memcpy(&pNew->cmd, pCmd, sizeof(SSqlCmd)); - - pNew->cmd.command = cmd; - pNew->cmd.payload = NULL; - pNew->cmd.allocSize = 0; + SSqlCmd* pnCmd = &pNew->cmd; + memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); + + pnCmd->command = cmd; + pnCmd->payload = NULL; + pnCmd->allocSize = 0; - pNew->cmd.pQueryInfo = NULL; - pNew->cmd.numOfClause = 0; - pNew->cmd.clauseIndex = 0; + pnCmd->pQueryInfo = NULL; + pnCmd->numOfClause = 0; + pnCmd->clauseIndex = 0; + pnCmd->pDataBlocks = NULL; - if (tscAddSubqueryInfo(&pNew->cmd) != TSDB_CODE_SUCCESS) { + if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) { tscFreeSqlObj(pNew); return NULL; } - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo)); @@ -2018,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); } - if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { + if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex); tscFreeSqlObj(pNew); return NULL; diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index f92ba36845ab92d972fff25eb1b5676dae4775c5..8e523eaf46701981a5469acfc19135a3fc4013d4 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -32,6 +32,7 @@ #include "vnode.h" static int32_t dnodeOpenVnodes(); +static void dnodeCloseVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); @@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() { return -1; } - if ( vnodeInitModule() != TSDB_CODE_SUCCESS) { - return -1; - } - int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -88,7 +85,7 @@ void dnodeCleanupMgmt() { tsDnodeTmr = NULL; } - vnodeCleanupModule(); + dnodeCloseVnodes(); } void dnodeMgmt(SRpcMsg *pMsg) { @@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static int32_t dnodeOpenVnodes() { +static int dnodeGetVnodeList(int32_t vnodeList[]) { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { return TSDB_CODE_NO_WRITE_ACCESS; @@ -122,18 +119,42 @@ static int32_t dnodeOpenVnodes() { int32_t vnode = atoi(de->d_name + 5); if (vnode == 0) continue; - char vnodeDir[TSDB_FILENAME_LEN * 3]; - snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name); - int32_t code = vnodeOpen(vnode, vnodeDir); - if (code == 0) { - numOfVnodes++; - } + vnodeList[numOfVnodes] = vnode; + numOfVnodes++; } } closedir(dir); - dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes); - return TSDB_CODE_SUCCESS; + return numOfVnodes; +} + +static int32_t dnodeOpenVnodes() { + char vnodeDir[TSDB_FILENAME_LEN * 3]; + int failed = 0; + + int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); + int numOfVnodes = dnodeGetVnodeList(vnodeList); + + for (int i=0; icfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + pCreate->cfg.commitLog = pCreate->cfg.commitLog; return vnodeCreate(pCreate); } diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 1e92b40977e91383b3304e2f0fed1f700b076278..682aee4c0b3f613d920b29a8add684604b1b9347 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -291,7 +291,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); } else { // no further execution invoked, release the ref to vnode dnodeProcessReadResult(pVnode, pMsg); - vnodeRelease(pVnode); + //vnodeRelease(pVnode); } } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 2348cf62a4c2b295a47b185087f0ed5b16c00596..aee14ed48414aa68590e246f8563806cd1df6a20 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -40,7 +40,7 @@ typedef struct { SRpcMsg rpcMsg; } SWriteMsg; -typedef struct _thread_obj { +typedef struct _wworker_pool { int32_t max; // max number of workers int32_t nextId; // from 0 to max-1, cyclic SWriteWorker *writeWorker; diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 0ca5b971804032978227327d625c17444469be3c..b459c2c5625c0424f4df0094eb2a3bc8e97a6e7e 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -25,13 +25,10 @@ typedef struct { void *rsp; } SRspRet; -int32_t vnodeInitModule(); -void vnodeCleanupModule(); - int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); -int32_t vnodeOpen(int32_t vnode, char *rootDir); -int32_t vnodeClose(void *pVnode); +int32_t vnodeOpen(int32_t vgId, char *rootDir); +int32_t vnodeClose(int32_t vgId); void vnodeRelease(void *pVnode); void* vnodeGetVnode(int32_t vgId); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 6a53b0d00183a18bf64cf89a88cd93896e29808f..7d13451f7ef3164becc2c2887a40039bff998ec4 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -166,8 +166,8 @@ SDbObj *mgmtGetDbByTableId(char *tableId) { } static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) { - if (pCreate->commitLog < 0 || pCreate->commitLog > 1) { - mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); + if (pCreate->commitLog < 0 || pCreate->commitLog > 2) { + mError("invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog); return TSDB_CODE_INVALID_OPTION; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index b16b82cb4a009fb56b42864ef101f18dd47e0cb5..87f3872b0aaacba15c49a5e3678d94993518ad8a 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -490,6 +490,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); pCfg->daysToKeep = htonl(pCfg->daysToKeep); pCfg->commitTime = htonl(pCfg->commitTime); + pCfg->commitLog = pCfg->commitLog; pCfg->blocksPerTable = htons(pCfg->blocksPerTable); pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 8dbbb97a1af438049adb40c12b2b6bec466dcb6e..6f86c2dd7cf78d5cf3ff54b6daf9c360cb6d5137 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -24,7 +24,6 @@ #include "lz4.h" #include "taoserror.h" #include "tsocket.h" -#include "shash.h" #include "taosmsg.h" #include "rpcUdp.h" #include "rpcCache.h" @@ -263,7 +262,6 @@ void *rpcOpen(SRpcInit *pInit) { } if (pRpc->connType == TAOS_CONN_SERVER) { -// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); if (pRpc->hash == NULL) { tError("%s failed to init string hash", pRpc->label); @@ -298,7 +296,6 @@ void rpcClose(void *param) { } } -// taosCleanUpStrHash(pRpc->hash); taosHashCleanup(pRpc->hash); taosTmrCleanUp(pRpc->tmrCtrl); taosIdPoolCleanUp(pRpc->idPool); @@ -535,8 +532,7 @@ static void rpcCloseConn(void *thandle) { if ( pRpc->connType == TAOS_CONN_SERVER) { char hashstr[40] = {0}; size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); -// taosDeleteStrHash(pRpc->hash, hashstr); -// taosHashRemove(pRpc->hash, hashstr, size); + taosHashRemove(pRpc->hash, hashstr, size); rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; @@ -588,7 +584,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); // check if it is already allocated -// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); if (ppConn) pConn = *ppConn; if (pConn) return pConn; @@ -621,7 +616,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->localPort = (pRpc->localPort + pRpc->index); } -// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", @@ -834,13 +828,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) { // if there are pending request, notify the app tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); @@ -1163,13 +1159,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->inType && pRpc->cfp) { // if there are pending request, notify the app tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); } else { diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 0287285f4db64b611ba29cfdfc72506e4bbbf08e..8a0d66068e92e5823788f8104885205e19de6708 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -632,7 +632,7 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, - 0, 1, 0, TSDB_CFG_UTYPE_NONE); + 0, 2, 0, TSDB_CFG_UTYPE_NONE); tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 0, 2, 0, TSDB_CFG_UTYPE_NONE); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 6d75776ac810140bdb54670e4cc723ef1e2b4e3f..2cf94267f8dd29b323dba1cbd8706388066b463d 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -92,7 +92,7 @@ void *taosAllocateQitem(int size) { void taosFreeQitem(void *param) { if (param == NULL) return; - //pTrace("item:%p is freed", param); + pTrace("item:%p is freed", param); char *temp = (char *)param; temp -= sizeof(STaosQnode); @@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - //pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems); + pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -197,7 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) { *pitem = pNode->item; *type = pNode->type; num = 1; - // pTrace("item:%p is fetched, type:%d", *pitem, *type); + pTrace("item:%p is fetched, type:%d", *pitem, *type); } return num; diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index be41b6b7c199d5f73aa39a3f688a304fab7381f5..1be9bbb64be2807c3c44d3ca83d789b82f563bb7 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); -int32_t vnodeInitModule() { +static int tsOpennedVnodes; +static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; + +static void vnodeInit() { vnodeInitWriteFp(); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); if (tsDnodeVnodesHash == NULL) { dError("failed to init vnode list"); - return -1; } - - return 0; -} - -typedef void (*CleanupFp)(char *); -void vnodeCleanupModule() { - taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose); - taosCleanUpIntHash(tsDnodeVnodesHash); } int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); @@ -93,7 +88,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { return code; } - dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId); + dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); return code; @@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj vnodeObj = {0}; vnodeObj.vgId = vnode; @@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); + tsOpennedVnodes++; return TSDB_CODE_SUCCESS; } -int32_t vnodeClose(void *param) { - SVnodeObj *pVnode = (SVnodeObj *)param; +int32_t vnodeClose(int32_t vgId) { + + SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + if (pVnode == NULL) return 0; dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); pVnode->status = VN_STATUS_CLOSING; @@ -165,7 +164,10 @@ void vnodeRelease(void *pVnodeRaw) { int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - if (refCount > 0) return; + if (refCount > 0) { + dTrace("pVnode:%p vgId:%d, release vnode, refCount:%d", pVnode, pVnode->vgId, refCount); + return; + } // remove read queue dnodeFreeRqueue(pVnode->rqueue); @@ -180,6 +182,13 @@ void vnodeRelease(void *pVnodeRaw) { } dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); + + tsOpennedVnodes--; + if (tsOpennedVnodes <= 0) { + taosCleanUpIntHash(tsDnodeVnodesHash); + vnodeModuleInit = PTHREAD_ONCE_INIT; + tsDnodeVnodesHash = NULL; + } } void *vnodeGetVnode(int32_t vgId) { @@ -232,10 +241,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { } static void vnodeCleanUp(SVnodeObj *pVnode) { - if (pVnode->status == VN_STATUS_DELETING) { - // fix deadlock occured while close system - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - } + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); //syncStop(pVnode->sync); tsdbCloseRepo(pVnode->tsdb); diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/main/src/vnodeWrite.c index 2c31f2243bbe773c6c3857617200a823d0c42a09..c6699bd62c5ef8d0b797b37da22e79307bf0301f 100644 --- a/src/vnode/main/src/vnodeWrite.c +++ b/src/vnode/main/src/vnodeWrite.c @@ -255,7 +255,8 @@ int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) { SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); memcpy(pWal, pHead, size); - taosWriteQitem(pVnode->wqueue, type, pHead); + atomic_add_fetch_32(&pVnode->refCount, 1); + taosWriteQitem(pVnode->wqueue, type, pWal); return 0; } diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index e192e91e8a156a4d34a38101afbeb96c4adf9e03..9708b0d9dc31429412aff292567175983cd20f32 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -287,6 +287,8 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); } + free(buffer); + return code; }