diff --git a/examples/lua/README.md b/examples/lua/README.md index 32d6a4cace9bd0bf66238ff32af1d3ecf0285046..5abf0c1aab9bf7f53daf2b95827169e3aaa23fc8 100644 --- a/examples/lua/README.md +++ b/examples/lua/README.md @@ -1,6 +1,12 @@ # TDengine driver connector for Lua It's a Lua implementation for [TDengine](https://github.com/taosdata/TDengine), an open-sourced big data platform designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring. You may need to install Lua5.3 . +As TDengine is built with lua-enable with default configure, the built-in lua lib conflicts with external lua lib. The following commands require TDengine built with lua-disable. +To disable built-in lua: +``` +mkdir debug && cd debug +cmake .. -DBUILD_LUA=false && cmake --build . +``` ## Lua Dependencies - Lua: diff --git a/examples/lua/lua51/lua_connector51.c b/examples/lua/lua51/lua_connector51.c index 4c702b2aaeac163f73bc0b2503449dd68a25150d..8a9051dd0cb3da0fd82e752132d499a5935be758 100644 --- a/examples/lua/lua51/lua_connector51.c +++ b/examples/lua/lua51/lua_connector51.c @@ -29,7 +29,7 @@ static int l_connect(lua_State *L){ luaL_checktype(L, 1, LUA_TTABLE); lua_getfield(L, 1,"host"); - if (lua_isstring(L,-1)){ + if (lua_isstring(L, -1)){ host = lua_tostring(L, -1); // printf("host = %s\n", host); } @@ -58,8 +58,9 @@ static int l_connect(lua_State *L){ //printf("password = %s\n", password); } - lua_settop(L,0); + lua_settop(L, 0); + taos_init(); lua_newtable(L); int table_index = lua_gettop(L); @@ -125,7 +126,7 @@ static int l_query(lua_State *L){ //printf("row index:%d\n",rows); rows++; - lua_pushnumber(L,rows); + lua_pushnumber(L, rows); lua_newtable(L); for (int i = 0; i < num_fields; ++i) { @@ -136,15 +137,19 @@ static int l_query(lua_State *L){ lua_pushstring(L,fields[i].name); int32_t* length = taos_fetch_lengths(result); switch (fields[i].type) { + case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_TINYINT: lua_pushinteger(L,*((char *)row[i])); break; + case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_SMALLINT: lua_pushinteger(L,*((short *)row[i])); break; + case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: lua_pushinteger(L,*((int *)row[i])); break; + case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_BIGINT: lua_pushinteger(L,*((int64_t *)row[i])); break; @@ -154,6 +159,7 @@ static int l_query(lua_State *L){ case TSDB_DATA_TYPE_DOUBLE: lua_pushnumber(L,*((double *)row[i])); break; + case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: //printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]); @@ -197,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ printf("failed, reason:%s\n", taos_errstr(result)); lua_pushinteger(L, -1); lua_setfield(L, table_index, "code"); - lua_pushstring(L,"something is wrong");// taos_errstr(taos)); + lua_pushstring(L, taos_errstr(result)); lua_setfield(L, table_index, "error"); }else{ //printf("success to async query.\n"); @@ -214,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ static int l_async_query(lua_State *L){ int r = luaL_ref(L, LUA_REGISTRYINDEX); - TAOS * taos = (TAOS*)lua_topointer(L,1); - const char * sqlstr = lua_tostring(L,2); - // int stime = luaL_checknumber(L,3); + TAOS * taos = (TAOS*)lua_topointer(L, 1); + const char * sqlstr = lua_tostring(L, 2); + // int stime = luaL_checknumber(L, 3); lua_newtable(L); int table_index = lua_gettop(L); @@ -224,7 +230,7 @@ static int l_async_query(lua_State *L){ struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param)); p->state = L; p->callback=r; - // printf("r:%d, L:%d\n",r,L); + // printf("r:%d, L:%d\n", r, L); taos_query_a(taos,sqlstr,async_query_callback,p); lua_pushnumber(L, 0); @@ -267,7 +273,7 @@ static const struct luaL_Reg lib[] = { extern int luaopen_luaconnector51(lua_State* L) { // luaL_register(L, "luaconnector51", lib); - lua_newtable (L); + lua_newtable(L); luaL_setfuncs(L,lib,0); return 1; } diff --git a/examples/lua/lua_connector.c b/examples/lua/lua_connector.c index ce13ab3829dec17e4df97ab94f358bd128e80cf1..c3d8bcb99548c787953cd27bcbb90b6c63ee5a4a 100644 --- a/examples/lua/lua_connector.c +++ b/examples/lua/lua_connector.c @@ -29,7 +29,7 @@ static int l_connect(lua_State *L){ luaL_checktype(L, 1, LUA_TTABLE); lua_getfield(L, 1,"host"); - if (lua_isstring(L,-1)){ + if (lua_isstring(L, -1)){ host = lua_tostring(L, -1); // printf("host = %s\n", host); } @@ -58,7 +58,7 @@ static int l_connect(lua_State *L){ //printf("password = %s\n", password); } - lua_settop(L,0); + lua_settop(L, 0); taos_init(); @@ -126,7 +126,7 @@ static int l_query(lua_State *L){ //printf("row index:%d\n",rows); rows++; - lua_pushnumber(L,rows); + lua_pushnumber(L, rows); lua_newtable(L); for (int i = 0; i < num_fields; ++i) { @@ -203,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ printf("failed, reason:%s\n", taos_errstr(result)); lua_pushinteger(L, -1); lua_setfield(L, table_index, "code"); - lua_pushstring(L,"something is wrong");// taos_errstr(taos)); + lua_pushstring(L, taos_errstr(result)); lua_setfield(L, table_index, "error"); }else{ //printf("success to async query.\n"); @@ -220,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ static int l_async_query(lua_State *L){ int r = luaL_ref(L, LUA_REGISTRYINDEX); - TAOS * taos = (TAOS*)lua_topointer(L,1); - const char * sqlstr = lua_tostring(L,2); - // int stime = luaL_checknumber(L,3); + TAOS * taos = (TAOS*)lua_topointer(L, 1); + const char * sqlstr = lua_tostring(L, 2); + // int stime = luaL_checknumber(L, 3); lua_newtable(L); int table_index = lua_gettop(L); @@ -230,7 +230,7 @@ static int l_async_query(lua_State *L){ struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param)); p->state = L; p->callback=r; - // printf("r:%d, L:%d\n",r,L); + // printf("r:%d, L:%d\n", r, L); taos_query_a(taos,sqlstr,async_query_callback,p); lua_pushnumber(L, 0); diff --git a/examples/lua/test.lua b/examples/lua/test.lua index 3d725cc6a368a4d263729a35612ae0461c86b5ab..94415982e7ceaa75498acc8b8bfa523d953ec401 100644 --- a/examples/lua/test.lua +++ b/examples/lua/test.lua @@ -176,8 +176,14 @@ end driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)") +if res.code ~=0 then + print("create stream--- failed:"..res.error) + return +else + print("create stream--- pass") +end -print("From now on we start continous insert in an definite loop, pls wait for about 10 seconds and check stream table for result.") +print("From now on we start continous insertion in an definite loop, please wait for about 10 seconds and check stream table avg_degree for result.") local loop_index = 0 while loop_index < 10 do local t = os.time()*1000 @@ -193,5 +199,5 @@ while loop_index < 10 do os.execute("sleep " .. 1) loop_index = loop_index + 1 end -driver.query(conn,"DROP STREAM IF EXISTS avg_therm_s") +driver.query(conn,"DROP STREAM IF EXISTS stream_avg_degree") driver.close(conn) diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index ec760e3c57c277ed3183bb73729f7a655daa0304..fcbcff924879995639f1a07a2f267506ca72a4af 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -85,6 +85,8 @@ _err: int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { taosThreadRwlockDestroy(&(*pTsdb)->rwLock); + tsdbMemTableDestroy((*pTsdb)->mem); + (*pTsdb)->mem = NULL; tsdbFSClose(*pTsdb); tsdbCloseCache(*pTsdb); taosMemoryFreeClear(*pTsdb); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 5a22114ab42206f0f63a4c41a3d4c53c438ff68b..6e02425b55a39463d25638a2190de96f3459d657 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -53,6 +53,10 @@ int vnodeCloseBufPool(SVnode *pVnode) { vnodeBufPoolDestroy(pPool); } + if (pVnode->inUse) { + vnodeBufPoolDestroy(pVnode->inUse); + pVnode->inUse = NULL; + } vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); return 0; @@ -177,4 +181,4 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { taosThreadMutexUnlock(&pVnode->mutex); } -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4ccfea40510a6148ceec57893629c693d9d7a1cd..b5307cecf29d21bf68773c41ed318341e083d814 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -161,7 +161,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -174,6 +173,7 @@ _err: if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pMeta) metaClose(pVnode->pMeta); + if (pVnode->pPool) vnodeCloseBufPool(pVnode); tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 7ecff7ae97bf04da20c1817c413a44a0dd32e8c8..96e22720e8b2b0fb4defc5d5b46e8d765fc54efb 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -32,7 +32,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port); void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); -void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); +bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); bool syncUtilEmptyId(const SRaftId* pId); diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 3bda9bcd51a1fe41fbeb09a1e4a39c3a53f1cd74..28b5313ac514ef98f4295cd547b947447d9c09bc 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -20,7 +20,10 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr)); - ASSERT(pSyncIndexMgr != NULL); + if (pSyncIndexMgr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr)); pSyncIndexMgr->replicas = &(pSyncNode->replicasId); @@ -248,4 +251,4 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI } ASSERT(0); return -1; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 17157fbd2327ce126b4e63e509c7f0935539860e..3d79c6c46477f17f59e35a259fcea21329bcda3d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -51,15 +51,17 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -// life cycle -static void syncFreeNode(void* param); // --------------------------------- +static void syncNodeFreeCb(void *param) { + syncNodeClose(param); + param = NULL; +} int32_t syncInit() { int32_t ret = 0; if (!syncEnvIsStart()) { - tsNodeRefId = taosOpenRef(200, syncFreeNode); + tsNodeRefId = taosOpenRef(200, syncNodeFreeCb); if (tsNodeRefId < 0) { sError("failed to init node ref"); syncCleanUp(); @@ -86,11 +88,15 @@ void syncCleanUp() { int64_t syncOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); - ASSERT(pSyncNode != NULL); + if (pSyncNode == NULL) { + sError("failed to open sync node. vgId:%d", pSyncInfo->vgId); + return -1; + } pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); if (pSyncNode->rid < 0) { - syncFreeNode(pSyncNode); + syncNodeClose(pSyncNode); + pSyncNode = NULL; return -1; } @@ -136,11 +142,9 @@ void syncStartStandBy(int64_t rid) { void syncStop(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) return; - int32_t vgId = pSyncNode->vgId; - syncNodeClose(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + taosRemoveRef(tsNodeRefId, rid); sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId); } @@ -210,7 +214,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; - sError("syncNodeCheckNewConfig error"); + sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; } @@ -237,7 +241,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; - sError("syncNodeCheckNewConfig error"); + sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; } @@ -941,16 +945,18 @@ _END: SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; - SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); - ASSERT(pSyncNode != NULL); - memset(pSyncNode, 0, sizeof(SSyncNode)); + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode)); + if (pSyncNode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } int32_t ret = 0; if (!taosDirExist((char*)(pSyncInfo->path))) { if (taosMkDir(pSyncInfo->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); - return NULL; + goto _error; } } @@ -963,15 +969,21 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.batchSize = pSyncInfo->batchSize; ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); - ASSERT(ret == 0); - + if (ret != 0) { + sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath); + goto _error; + } } else { // update syncCfg by raft_config.json pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); - ASSERT(pSyncNode->pRaftCfg != NULL); + if (pSyncNode->pRaftCfg == NULL) { + sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); + goto _error; + } pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; raftCfgClose(pSyncNode->pRaftCfg); + pSyncNode->pRaftCfg = NULL; } // init by SSyncInfo @@ -988,11 +1000,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); - ASSERT(pSyncNode->pRaftCfg != NULL); + if (pSyncNode->pRaftCfg == NULL) { + sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); + goto _error; + } // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; - syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) { + sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId); + goto _error; + } // init peersNum, peers, peersId pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; @@ -1004,17 +1022,24 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { } } for (int i = 0; i < pSyncNode->peersNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) { + sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i); + goto _error; + } } // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); + if(!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { + sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i); + goto _error; + } } // init raft algorithm pSyncNode->pFsm = pSyncInfo->pFsm; + pSyncInfo->pFsm = NULL; pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); pSyncNode->leaderCache = EMPTY_RAFT_ID; @@ -1047,29 +1072,50 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); - ASSERT(pSyncNode->pRaftStore != NULL); + if (pSyncNode->pRaftStore == NULL) { + sError("failed to open raft store. path: %s", pSyncNode->raftStorePath); + goto _error; + } // init TLA+ candidate vars pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); - ASSERT(pSyncNode->pVotesGranted != NULL); + if (pSyncNode->pVotesGranted == NULL) { + sError("failed to create VotesGranted. vgId:%d", pSyncNode->vgId); + goto _error; + } pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); - ASSERT(pSyncNode->pVotesRespond != NULL); + if (pSyncNode->pVotesRespond == NULL) { + sError("failed to create VotesRespond. vgId:%d", pSyncNode->vgId); + goto _error; + } // init TLA+ leader vars pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); - ASSERT(pSyncNode->pNextIndex != NULL); + if (pSyncNode->pNextIndex == NULL) { + sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); - ASSERT(pSyncNode->pMatchIndex != NULL); + if (pSyncNode->pMatchIndex == NULL) { + sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } // init TLA+ log vars pSyncNode->pLogStore = logStoreCreate(pSyncNode); - ASSERT(pSyncNode->pLogStore != NULL); + if (pSyncNode->pLogStore == NULL) { + sError("failed to create SyncLogStore. vgId:%d", pSyncNode->vgId); + goto _error; + } SyncIndex commitIndex = SYNC_INDEX_INVALID; if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot = {0}; int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - ASSERT(code == 0); + if (code != 0) { + sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code); + goto _error; + } if (snapshot.lastApplyIndex > commitIndex) { commitIndex = snapshot.lastApplyIndex; syncNodeEventLog(pSyncNode, "reset commit index by snapshot"); @@ -1132,7 +1178,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // tools pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS); - ASSERT(pSyncNode->pSyncRespMgr != NULL); + if (pSyncNode->pSyncRespMgr == NULL) { + sError("failed to create SyncRespMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } // restore state pSyncNode->restoreFinish = false; @@ -1162,6 +1211,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { syncNodeEventLog(pSyncNode, "sync open"); return pSyncNode; + +_error: + if (pSyncInfo->pFsm) { + taosMemoryFree(pSyncInfo->pFsm); + pSyncInfo->pFsm = NULL; + } + syncNodeClose(pSyncNode); + pSyncNode = NULL; + return NULL; } void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { @@ -1214,20 +1272,28 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "sync close"); - + if (pSyncNode == NULL) { + return; + } int32_t ret; - ASSERT(pSyncNode != NULL); ret = raftStoreClose(pSyncNode->pRaftStore); ASSERT(ret == 0); syncRespMgrDestroy(pSyncNode->pSyncRespMgr); + pSyncNode->pSyncRespMgr = NULL; voteGrantedDestroy(pSyncNode->pVotesGranted); + pSyncNode->pVotesGranted = NULL; votesRespondDestory(pSyncNode->pVotesRespond); + pSyncNode->pVotesRespond = NULL; syncIndexMgrDestroy(pSyncNode->pNextIndex); + pSyncNode->pNextIndex = NULL; syncIndexMgrDestroy(pSyncNode->pMatchIndex); + pSyncNode->pMatchIndex = NULL; logStoreDestory(pSyncNode->pLogStore); + pSyncNode->pLogStore = NULL; raftCfgClose(pSyncNode->pRaftCfg); + pSyncNode->pRaftCfg = NULL; syncNodeStopPingTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode); @@ -1249,8 +1315,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { pSyncNode->pNewNodeReceiver = NULL; } - // free memory in syncFreeNode - // taosMemoryFree(pSyncNode); + taosMemoryFree(pSyncNode); } // option @@ -2534,7 +2599,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { return; } } else { - sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL"); + sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); } syncTimeoutDestroy(pSyncMsg); @@ -2774,14 +2839,6 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p return 0; } -static void syncFreeNode(void* param) { - SSyncNode* pNode = param; - // inner object already free - // syncNodePrint2((char*)"==syncFreeNode==", pNode); - - taosMemoryFree(pNode); -} - const char* syncStr(ESyncState state) { switch (state) { case TAOS_SYNC_STATE_FOLLOWER: diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index ab404d1b9af744b51b508cd1f870482c79756ea1..57126d0871da0ab80cf718260377fa8754581d65 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -364,8 +364,6 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { int32_t sysErr = errno; const char *sysErrStr = strerror(errno); sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr); - ASSERT(0); - return -1; } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 29f78b582f1f1ddab18dba54b3b495bf3aa8af98..22b47a2c45e14216b1554c9e606f2d66b1d07b5e 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -28,7 +28,7 @@ SRaftStore *raftStoreOpen(const char *path) { SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore)); if (pRaftStore == NULL) { - sError("raftStoreOpen malloc error"); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } memset(pRaftStore, 0, sizeof(*pRaftStore)); @@ -72,7 +72,9 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) { } int32_t raftStoreClose(SRaftStore *pRaftStore) { - ASSERT(pRaftStore != NULL); + if (pRaftStore == NULL) { + return 0; + } taosCloseFile(&pRaftStore->pFile); taosMemoryFree(pRaftStore); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index d7ed864180335eb5a07ea58298c574fb71265fe2..103c2254768a3b5bfc6d5c8090828f999b2e8c9e 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -19,6 +19,10 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr)); + if (pObj == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pObj, 0, sizeof(SSyncRespMgr)); pObj->pRespHash = diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 0be3392a9a52b69e29cbcedcb910cdbc0f9a6234..68d81813ac760a37b78bfdc12aee59624b296c48 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -35,7 +35,10 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI SSyncSnapshotSender *pSender = NULL; if (condition) { pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); - ASSERT(pSender != NULL); + if (pSender == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pSender, 0, sizeof(*pSender)); pSender->start = false; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 325073f3660172a1067d0bf92e11a8ae9f88fa01..6f234631dae8620df80fe5aba0366a5cb8ad62c7 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -26,7 +26,8 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint32_t hostU32 = taosGetIpv4FromFqdn(host); if (hostU32 == (uint32_t)-1) { - sError("Get IP address error"); + sError("failed to resolve ipv4 addr. host:%s", host); + terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } @@ -84,13 +85,18 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { addEpIntoEpSet(pEpSet, host, port); } -void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { +bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn); - ASSERT(ipv4 != 0xFFFFFFFF); + if (ipv4 == 0xFFFFFFFF || ipv4 == 1) { + sError("failed to resolve ipv4 addr. fqdn: %s", pNodeInfo->nodeFqdn); + terrno = TSDB_CODE_TSC_INVALID_FQDN; + return false; + } char ipbuf[128] = {0}; tinet_ntoa(ipbuf, ipv4); raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort); raftId->vgId = vgId; + return true; } bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { @@ -310,4 +316,4 @@ void syncUtilJson2Line(char* jsonStr) { q++; } } -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 1d46d71a05b66d30fba02ec342014a56f388e73e..641bb32d2d56a0bf745c6cb4c5fb43ccaaaa6f75 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -24,7 +24,10 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted)); - ASSERT(pVotesGranted != NULL); + if (pVotesGranted == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pVotesGranted, 0, sizeof(SVotesGranted)); pVotesGranted->replicas = &(pSyncNode->replicasId);