提交 f12449b1 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression

# TDengine driver connector for Lua # TDengine driver connector for Lua
It's a Lua implementation for [TDengine](https://github.com/taosdata/TDengine), an open-sourced big data platform designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring. You may need to install Lua5.3 . It's a Lua implementation for [TDengine](https://github.com/taosdata/TDengine), an open-sourced big data platform designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring. You may need to install Lua5.3 .
As TDengine is built with lua-enable with default configure, the built-in lua lib conflicts with external lua lib. The following commands require TDengine built with lua-disable.
To disable built-in lua:
```
mkdir debug && cd debug
cmake .. -DBUILD_LUA=false && cmake --build .
```
## Lua Dependencies ## Lua Dependencies
- Lua: - Lua:
......
...@@ -29,7 +29,7 @@ static int l_connect(lua_State *L){ ...@@ -29,7 +29,7 @@ static int l_connect(lua_State *L){
luaL_checktype(L, 1, LUA_TTABLE); luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L, 1,"host"); lua_getfield(L, 1,"host");
if (lua_isstring(L,-1)){ if (lua_isstring(L, -1)){
host = lua_tostring(L, -1); host = lua_tostring(L, -1);
// printf("host = %s\n", host); // printf("host = %s\n", host);
} }
...@@ -58,8 +58,9 @@ static int l_connect(lua_State *L){ ...@@ -58,8 +58,9 @@ static int l_connect(lua_State *L){
//printf("password = %s\n", password); //printf("password = %s\n", password);
} }
lua_settop(L,0); lua_settop(L, 0);
taos_init();
lua_newtable(L); lua_newtable(L);
int table_index = lua_gettop(L); int table_index = lua_gettop(L);
...@@ -125,7 +126,7 @@ static int l_query(lua_State *L){ ...@@ -125,7 +126,7 @@ static int l_query(lua_State *L){
//printf("row index:%d\n",rows); //printf("row index:%d\n",rows);
rows++; rows++;
lua_pushnumber(L,rows); lua_pushnumber(L, rows);
lua_newtable(L); lua_newtable(L);
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
...@@ -136,15 +137,19 @@ static int l_query(lua_State *L){ ...@@ -136,15 +137,19 @@ static int l_query(lua_State *L){
lua_pushstring(L,fields[i].name); lua_pushstring(L,fields[i].name);
int32_t* length = taos_fetch_lengths(result); int32_t* length = taos_fetch_lengths(result);
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i])); lua_pushinteger(L,*((char *)row[i]));
break; break;
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i])); lua_pushinteger(L,*((short *)row[i]));
break; break;
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i])); lua_pushinteger(L,*((int *)row[i]));
break; break;
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i])); lua_pushinteger(L,*((int64_t *)row[i]));
break; break;
...@@ -154,6 +159,7 @@ static int l_query(lua_State *L){ ...@@ -154,6 +159,7 @@ static int l_query(lua_State *L){
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i])); lua_pushnumber(L,*((double *)row[i]));
break; break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
//printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]); //printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]);
...@@ -197,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ ...@@ -197,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){
printf("failed, reason:%s\n", taos_errstr(result)); printf("failed, reason:%s\n", taos_errstr(result));
lua_pushinteger(L, -1); lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code"); lua_setfield(L, table_index, "code");
lua_pushstring(L,"something is wrong");// taos_errstr(taos)); lua_pushstring(L, taos_errstr(result));
lua_setfield(L, table_index, "error"); lua_setfield(L, table_index, "error");
}else{ }else{
//printf("success to async query.\n"); //printf("success to async query.\n");
...@@ -214,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ ...@@ -214,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){
static int l_async_query(lua_State *L){ static int l_async_query(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX); int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1); TAOS * taos = (TAOS*)lua_topointer(L, 1);
const char * sqlstr = lua_tostring(L,2); const char * sqlstr = lua_tostring(L, 2);
// int stime = luaL_checknumber(L,3); // int stime = luaL_checknumber(L, 3);
lua_newtable(L); lua_newtable(L);
int table_index = lua_gettop(L); int table_index = lua_gettop(L);
...@@ -224,7 +230,7 @@ static int l_async_query(lua_State *L){ ...@@ -224,7 +230,7 @@ static int l_async_query(lua_State *L){
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param)); struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
p->state = L; p->state = L;
p->callback=r; p->callback=r;
// printf("r:%d, L:%d\n",r,L); // printf("r:%d, L:%d\n", r, L);
taos_query_a(taos,sqlstr,async_query_callback,p); taos_query_a(taos,sqlstr,async_query_callback,p);
lua_pushnumber(L, 0); lua_pushnumber(L, 0);
...@@ -267,7 +273,7 @@ static const struct luaL_Reg lib[] = { ...@@ -267,7 +273,7 @@ static const struct luaL_Reg lib[] = {
extern int luaopen_luaconnector51(lua_State* L) extern int luaopen_luaconnector51(lua_State* L)
{ {
// luaL_register(L, "luaconnector51", lib); // luaL_register(L, "luaconnector51", lib);
lua_newtable (L); lua_newtable(L);
luaL_setfuncs(L,lib,0); luaL_setfuncs(L,lib,0);
return 1; return 1;
} }
...@@ -29,7 +29,7 @@ static int l_connect(lua_State *L){ ...@@ -29,7 +29,7 @@ static int l_connect(lua_State *L){
luaL_checktype(L, 1, LUA_TTABLE); luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L, 1,"host"); lua_getfield(L, 1,"host");
if (lua_isstring(L,-1)){ if (lua_isstring(L, -1)){
host = lua_tostring(L, -1); host = lua_tostring(L, -1);
// printf("host = %s\n", host); // printf("host = %s\n", host);
} }
...@@ -58,7 +58,7 @@ static int l_connect(lua_State *L){ ...@@ -58,7 +58,7 @@ static int l_connect(lua_State *L){
//printf("password = %s\n", password); //printf("password = %s\n", password);
} }
lua_settop(L,0); lua_settop(L, 0);
taos_init(); taos_init();
...@@ -126,7 +126,7 @@ static int l_query(lua_State *L){ ...@@ -126,7 +126,7 @@ static int l_query(lua_State *L){
//printf("row index:%d\n",rows); //printf("row index:%d\n",rows);
rows++; rows++;
lua_pushnumber(L,rows); lua_pushnumber(L, rows);
lua_newtable(L); lua_newtable(L);
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
...@@ -203,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ ...@@ -203,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){
printf("failed, reason:%s\n", taos_errstr(result)); printf("failed, reason:%s\n", taos_errstr(result));
lua_pushinteger(L, -1); lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code"); lua_setfield(L, table_index, "code");
lua_pushstring(L,"something is wrong");// taos_errstr(taos)); lua_pushstring(L, taos_errstr(result));
lua_setfield(L, table_index, "error"); lua_setfield(L, table_index, "error");
}else{ }else{
//printf("success to async query.\n"); //printf("success to async query.\n");
...@@ -220,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){ ...@@ -220,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){
static int l_async_query(lua_State *L){ static int l_async_query(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX); int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1); TAOS * taos = (TAOS*)lua_topointer(L, 1);
const char * sqlstr = lua_tostring(L,2); const char * sqlstr = lua_tostring(L, 2);
// int stime = luaL_checknumber(L,3); // int stime = luaL_checknumber(L, 3);
lua_newtable(L); lua_newtable(L);
int table_index = lua_gettop(L); int table_index = lua_gettop(L);
...@@ -230,7 +230,7 @@ static int l_async_query(lua_State *L){ ...@@ -230,7 +230,7 @@ static int l_async_query(lua_State *L){
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param)); struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
p->state = L; p->state = L;
p->callback=r; p->callback=r;
// printf("r:%d, L:%d\n",r,L); // printf("r:%d, L:%d\n", r, L);
taos_query_a(taos,sqlstr,async_query_callback,p); taos_query_a(taos,sqlstr,async_query_callback,p);
lua_pushnumber(L, 0); lua_pushnumber(L, 0);
......
...@@ -176,8 +176,14 @@ end ...@@ -176,8 +176,14 @@ end
driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)") res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)")
if res.code ~=0 then
print("create stream--- failed:"..res.error)
return
else
print("create stream--- pass")
end
print("From now on we start continous insert in an definite loop, pls wait for about 10 seconds and check stream table for result.") print("From now on we start continous insertion in an definite loop, please wait for about 10 seconds and check stream table avg_degree for result.")
local loop_index = 0 local loop_index = 0
while loop_index < 10 do while loop_index < 10 do
local t = os.time()*1000 local t = os.time()*1000
...@@ -193,5 +199,5 @@ while loop_index < 10 do ...@@ -193,5 +199,5 @@ while loop_index < 10 do
os.execute("sleep " .. 1) os.execute("sleep " .. 1)
loop_index = loop_index + 1 loop_index = loop_index + 1
end end
driver.query(conn,"DROP STREAM IF EXISTS avg_therm_s") driver.query(conn,"DROP STREAM IF EXISTS stream_avg_degree")
driver.close(conn) driver.close(conn)
...@@ -85,6 +85,8 @@ _err: ...@@ -85,6 +85,8 @@ _err:
int tsdbClose(STsdb **pTsdb) { int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) { if (*pTsdb) {
taosThreadRwlockDestroy(&(*pTsdb)->rwLock); taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
tsdbMemTableDestroy((*pTsdb)->mem);
(*pTsdb)->mem = NULL;
tsdbFSClose(*pTsdb); tsdbFSClose(*pTsdb);
tsdbCloseCache(*pTsdb); tsdbCloseCache(*pTsdb);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);
......
...@@ -53,6 +53,10 @@ int vnodeCloseBufPool(SVnode *pVnode) { ...@@ -53,6 +53,10 @@ int vnodeCloseBufPool(SVnode *pVnode) {
vnodeBufPoolDestroy(pPool); vnodeBufPoolDestroy(pPool);
} }
if (pVnode->inUse) {
vnodeBufPoolDestroy(pVnode->inUse);
pVnode->inUse = NULL;
}
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
return 0; return 0;
...@@ -177,4 +181,4 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { ...@@ -177,4 +181,4 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
} }
} }
\ No newline at end of file
...@@ -161,7 +161,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -161,7 +161,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open sync // open sync
if (vnodeSyncOpen(pVnode, dir)) { if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -174,6 +173,7 @@ _err: ...@@ -174,6 +173,7 @@ _err:
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(pVnode->pMeta); if (pVnode->pMeta) metaClose(pVnode->pMeta);
if (pVnode->pPool) vnodeCloseBufPool(pVnode);
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
......
...@@ -32,7 +32,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port); ...@@ -32,7 +32,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port);
void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet);
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
bool syncUtilEmptyId(const SRaftId* pId); bool syncUtilEmptyId(const SRaftId* pId);
......
...@@ -20,7 +20,10 @@ ...@@ -20,7 +20,10 @@
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr)); SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
ASSERT(pSyncIndexMgr != NULL); if (pSyncIndexMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr)); memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));
pSyncIndexMgr->replicas = &(pSyncNode->replicasId); pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
...@@ -248,4 +251,4 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI ...@@ -248,4 +251,4 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI
} }
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
\ No newline at end of file
...@@ -51,15 +51,17 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths); ...@@ -51,15 +51,17 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
// life cycle
static void syncFreeNode(void* param);
// --------------------------------- // ---------------------------------
static void syncNodeFreeCb(void *param) {
syncNodeClose(param);
param = NULL;
}
int32_t syncInit() { int32_t syncInit() {
int32_t ret = 0; int32_t ret = 0;
if (!syncEnvIsStart()) { if (!syncEnvIsStart()) {
tsNodeRefId = taosOpenRef(200, syncFreeNode); tsNodeRefId = taosOpenRef(200, syncNodeFreeCb);
if (tsNodeRefId < 0) { if (tsNodeRefId < 0) {
sError("failed to init node ref"); sError("failed to init node ref");
syncCleanUp(); syncCleanUp();
...@@ -86,11 +88,15 @@ void syncCleanUp() { ...@@ -86,11 +88,15 @@ void syncCleanUp() {
int64_t syncOpen(const SSyncInfo* pSyncInfo) { int64_t syncOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
ASSERT(pSyncNode != NULL); if (pSyncNode == NULL) {
sError("failed to open sync node. vgId:%d", pSyncInfo->vgId);
return -1;
}
pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
if (pSyncNode->rid < 0) { if (pSyncNode->rid < 0) {
syncFreeNode(pSyncNode); syncNodeClose(pSyncNode);
pSyncNode = NULL;
return -1; return -1;
} }
...@@ -136,11 +142,9 @@ void syncStartStandBy(int64_t rid) { ...@@ -136,11 +142,9 @@ void syncStartStandBy(int64_t rid) {
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) return; if (pSyncNode == NULL) return;
int32_t vgId = pSyncNode->vgId; int32_t vgId = pSyncNode->vgId;
syncNodeClose(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
taosRemoveRef(tsNodeRefId, rid); taosRemoveRef(tsNodeRefId, rid);
sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId); sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId);
} }
...@@ -210,7 +214,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg ...@@ -210,7 +214,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("syncNodeCheckNewConfig error"); sError("invalid new config. vgId:%d", pSyncNode->vgId);
return -1; return -1;
} }
...@@ -237,7 +241,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { ...@@ -237,7 +241,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("syncNodeCheckNewConfig error"); sError("invalid new config. vgId:%d", pSyncNode->vgId);
return -1; return -1;
} }
...@@ -941,16 +945,18 @@ _END: ...@@ -941,16 +945,18 @@ _END:
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode));
ASSERT(pSyncNode != NULL); if (pSyncNode == NULL) {
memset(pSyncNode, 0, sizeof(SSyncNode)); terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
int32_t ret = 0; int32_t ret = 0;
if (!taosDirExist((char*)(pSyncInfo->path))) { if (!taosDirExist((char*)(pSyncInfo->path))) {
if (taosMkDir(pSyncInfo->path) != 0) { if (taosMkDir(pSyncInfo->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
return NULL; goto _error;
} }
} }
...@@ -963,15 +969,21 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -963,15 +969,21 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.lastConfigIndex = SYNC_INDEX_INVALID;
meta.batchSize = pSyncInfo->batchSize; meta.batchSize = pSyncInfo->batchSize;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0); if (ret != 0) {
sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath);
goto _error;
}
} else { } else {
// update syncCfg by raft_config.json // update syncCfg by raft_config.json
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
ASSERT(pSyncNode->pRaftCfg != NULL); if (pSyncNode->pRaftCfg == NULL) {
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath);
goto _error;
}
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
raftCfgClose(pSyncNode->pRaftCfg); raftCfgClose(pSyncNode->pRaftCfg);
pSyncNode->pRaftCfg = NULL;
} }
// init by SSyncInfo // init by SSyncInfo
...@@ -988,11 +1000,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -988,11 +1000,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init raft config // init raft config
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
ASSERT(pSyncNode->pRaftCfg != NULL); if (pSyncNode->pRaftCfg == NULL) {
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath);
goto _error;
}
// init internal // init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init peersNum, peers, peersId // init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
...@@ -1004,17 +1022,24 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -1004,17 +1022,24 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
} }
} }
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i);
goto _error;
}
} }
// init replicaNum, replicasId // init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); if(!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i);
goto _error;
}
} }
// init raft algorithm // init raft algorithm
pSyncNode->pFsm = pSyncInfo->pFsm; pSyncNode->pFsm = pSyncInfo->pFsm;
pSyncInfo->pFsm = NULL;
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
pSyncNode->leaderCache = EMPTY_RAFT_ID; pSyncNode->leaderCache = EMPTY_RAFT_ID;
...@@ -1047,29 +1072,50 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -1047,29 +1072,50 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init TLA+ server vars // init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
ASSERT(pSyncNode->pRaftStore != NULL); if (pSyncNode->pRaftStore == NULL) {
sError("failed to open raft store. path: %s", pSyncNode->raftStorePath);
goto _error;
}
// init TLA+ candidate vars // init TLA+ candidate vars
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
ASSERT(pSyncNode->pVotesGranted != NULL); if (pSyncNode->pVotesGranted == NULL) {
sError("failed to create VotesGranted. vgId:%d", pSyncNode->vgId);
goto _error;
}
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
ASSERT(pSyncNode->pVotesRespond != NULL); if (pSyncNode->pVotesRespond == NULL) {
sError("failed to create VotesRespond. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init TLA+ leader vars // init TLA+ leader vars
pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
ASSERT(pSyncNode->pNextIndex != NULL); if (pSyncNode->pNextIndex == NULL) {
sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
ASSERT(pSyncNode->pMatchIndex != NULL); if (pSyncNode->pMatchIndex == NULL) {
sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init TLA+ log vars // init TLA+ log vars
pSyncNode->pLogStore = logStoreCreate(pSyncNode); pSyncNode->pLogStore = logStoreCreate(pSyncNode);
ASSERT(pSyncNode->pLogStore != NULL); if (pSyncNode->pLogStore == NULL) {
sError("failed to create SyncLogStore. vgId:%d", pSyncNode->vgId);
goto _error;
}
SyncIndex commitIndex = SYNC_INDEX_INVALID; SyncIndex commitIndex = SYNC_INDEX_INVALID;
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
ASSERT(code == 0); if (code != 0) {
sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code);
goto _error;
}
if (snapshot.lastApplyIndex > commitIndex) { if (snapshot.lastApplyIndex > commitIndex) {
commitIndex = snapshot.lastApplyIndex; commitIndex = snapshot.lastApplyIndex;
syncNodeEventLog(pSyncNode, "reset commit index by snapshot"); syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
...@@ -1132,7 +1178,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -1132,7 +1178,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// tools // tools
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS); pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
ASSERT(pSyncNode->pSyncRespMgr != NULL); if (pSyncNode->pSyncRespMgr == NULL) {
sError("failed to create SyncRespMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
// restore state // restore state
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
...@@ -1162,6 +1211,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -1162,6 +1211,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
syncNodeEventLog(pSyncNode, "sync open"); syncNodeEventLog(pSyncNode, "sync open");
return pSyncNode; return pSyncNode;
_error:
if (pSyncInfo->pFsm) {
taosMemoryFree(pSyncInfo->pFsm);
pSyncInfo->pFsm = NULL;
}
syncNodeClose(pSyncNode);
pSyncNode = NULL;
return NULL;
} }
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
...@@ -1214,20 +1272,28 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -1214,20 +1272,28 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "sync close"); syncNodeEventLog(pSyncNode, "sync close");
if (pSyncNode == NULL) {
return;
}
int32_t ret; int32_t ret;
ASSERT(pSyncNode != NULL);
ret = raftStoreClose(pSyncNode->pRaftStore); ret = raftStoreClose(pSyncNode->pRaftStore);
ASSERT(ret == 0); ASSERT(ret == 0);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr); syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
pSyncNode->pSyncRespMgr = NULL;
voteGrantedDestroy(pSyncNode->pVotesGranted); voteGrantedDestroy(pSyncNode->pVotesGranted);
pSyncNode->pVotesGranted = NULL;
votesRespondDestory(pSyncNode->pVotesRespond); votesRespondDestory(pSyncNode->pVotesRespond);
pSyncNode->pVotesRespond = NULL;
syncIndexMgrDestroy(pSyncNode->pNextIndex); syncIndexMgrDestroy(pSyncNode->pNextIndex);
pSyncNode->pNextIndex = NULL;
syncIndexMgrDestroy(pSyncNode->pMatchIndex); syncIndexMgrDestroy(pSyncNode->pMatchIndex);
pSyncNode->pMatchIndex = NULL;
logStoreDestory(pSyncNode->pLogStore); logStoreDestory(pSyncNode->pLogStore);
pSyncNode->pLogStore = NULL;
raftCfgClose(pSyncNode->pRaftCfg); raftCfgClose(pSyncNode->pRaftCfg);
pSyncNode->pRaftCfg = NULL;
syncNodeStopPingTimer(pSyncNode); syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);
...@@ -1249,8 +1315,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1249,8 +1315,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode->pNewNodeReceiver = NULL; pSyncNode->pNewNodeReceiver = NULL;
} }
// free memory in syncFreeNode taosMemoryFree(pSyncNode);
// taosMemoryFree(pSyncNode);
} }
// option // option
...@@ -2534,7 +2599,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -2534,7 +2599,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return; return;
} }
} else { } else {
sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL"); sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId);
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
...@@ -2774,14 +2839,6 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p ...@@ -2774,14 +2839,6 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
return 0; return 0;
} }
static void syncFreeNode(void* param) {
SSyncNode* pNode = param;
// inner object already free
// syncNodePrint2((char*)"==syncFreeNode==", pNode);
taosMemoryFree(pNode);
}
const char* syncStr(ESyncState state) { const char* syncStr(ESyncState state) {
switch (state) { switch (state) {
case TAOS_SYNC_STATE_FOLLOWER: case TAOS_SYNC_STATE_FOLLOWER:
......
...@@ -364,8 +364,6 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { ...@@ -364,8 +364,6 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
int32_t sysErr = errno; int32_t sysErr = errno;
const char *sysErrStr = strerror(errno); const char *sysErrStr = strerror(errno);
sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr); sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr);
ASSERT(0);
return -1; return -1;
} }
......
...@@ -28,7 +28,7 @@ SRaftStore *raftStoreOpen(const char *path) { ...@@ -28,7 +28,7 @@ SRaftStore *raftStoreOpen(const char *path) {
SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore)); SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore));
if (pRaftStore == NULL) { if (pRaftStore == NULL) {
sError("raftStoreOpen malloc error"); terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
memset(pRaftStore, 0, sizeof(*pRaftStore)); memset(pRaftStore, 0, sizeof(*pRaftStore));
...@@ -72,7 +72,9 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) { ...@@ -72,7 +72,9 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) {
} }
int32_t raftStoreClose(SRaftStore *pRaftStore) { int32_t raftStoreClose(SRaftStore *pRaftStore) {
ASSERT(pRaftStore != NULL); if (pRaftStore == NULL) {
return 0;
}
taosCloseFile(&pRaftStore->pFile); taosCloseFile(&pRaftStore->pFile);
taosMemoryFree(pRaftStore); taosMemoryFree(pRaftStore);
......
...@@ -19,6 +19,10 @@ ...@@ -19,6 +19,10 @@
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr)); SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
if (pObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pObj, 0, sizeof(SSyncRespMgr)); memset(pObj, 0, sizeof(SSyncRespMgr));
pObj->pRespHash = pObj->pRespHash =
......
...@@ -35,7 +35,10 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI ...@@ -35,7 +35,10 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
SSyncSnapshotSender *pSender = NULL; SSyncSnapshotSender *pSender = NULL;
if (condition) { if (condition) {
pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
ASSERT(pSender != NULL); if (pSender == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pSender, 0, sizeof(*pSender)); memset(pSender, 0, sizeof(*pSender));
pSender->start = false; pSender->start = false;
......
...@@ -26,7 +26,8 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { ...@@ -26,7 +26,8 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
uint32_t hostU32 = taosGetIpv4FromFqdn(host); uint32_t hostU32 = taosGetIpv4FromFqdn(host);
if (hostU32 == (uint32_t)-1) { if (hostU32 == (uint32_t)-1) {
sError("Get IP address error"); sError("failed to resolve ipv4 addr. host:%s", host);
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1; return -1;
} }
...@@ -84,13 +85,18 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { ...@@ -84,13 +85,18 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
addEpIntoEpSet(pEpSet, host, port); addEpIntoEpSet(pEpSet, host, port);
} }
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn); uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn);
ASSERT(ipv4 != 0xFFFFFFFF); if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
sError("failed to resolve ipv4 addr. fqdn: %s", pNodeInfo->nodeFqdn);
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return false;
}
char ipbuf[128] = {0}; char ipbuf[128] = {0};
tinet_ntoa(ipbuf, ipv4); tinet_ntoa(ipbuf, ipv4);
raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort); raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort);
raftId->vgId = vgId; raftId->vgId = vgId;
return true;
} }
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
...@@ -310,4 +316,4 @@ void syncUtilJson2Line(char* jsonStr) { ...@@ -310,4 +316,4 @@ void syncUtilJson2Line(char* jsonStr) {
q++; q++;
} }
} }
} }
\ No newline at end of file
...@@ -24,7 +24,10 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { ...@@ -24,7 +24,10 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted)); SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted));
ASSERT(pVotesGranted != NULL); if (pVotesGranted == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pVotesGranted, 0, sizeof(SVotesGranted)); memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->replicas = &(pSyncNode->replicasId); pVotesGranted->replicas = &(pSyncNode->replicasId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册