提交 d41ce958 编写于 作者: L Liu Jicong

Merge branch 'fix/TD-19325' into feature/tq

......@@ -22,5 +22,4 @@ An example is as follows.
username = "root"
password = "taosdata"
data_format = "influx"
influx_max_line_bytes = 250
```
......@@ -60,7 +60,6 @@ For the configuration method, add the following text to `/etc/telegraf/telegraf.
username = "<TDengine's username>"
password = "<TDengine's password>"
data_format = "influx"
influx_max_line_bytes = 250
```
Then restart telegraf:
......
......@@ -22,6 +22,5 @@
username = "root"
password = "taosdata"
data_format = "influx"
influx_max_line_bytes = 250
```
......@@ -61,7 +61,6 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如
username = "<TDengine's username>"
password = "<TDengine's password>"
data_format = "influx"
influx_max_line_bytes = 250
```
然后重启 Telegraf:
......
# TDengine driver connector for Lua
It's a Lua implementation for [TDengine](https://github.com/taosdata/TDengine), an open-sourced big data platform designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring. You may need to install Lua5.3 .
As TDengine is built with lua-enable with default configure, the built-in lua lib conflicts with external lua lib. The following commands require TDengine built with lua-disable.
To disable built-in lua:
```
mkdir debug && cd debug
cmake .. -DBUILD_LUA=false && cmake --build .
```
## Lua Dependencies
- Lua:
......
......@@ -29,7 +29,7 @@ static int l_connect(lua_State *L){
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L, 1,"host");
if (lua_isstring(L,-1)){
if (lua_isstring(L, -1)){
host = lua_tostring(L, -1);
// printf("host = %s\n", host);
}
......@@ -58,8 +58,9 @@ static int l_connect(lua_State *L){
//printf("password = %s\n", password);
}
lua_settop(L,0);
lua_settop(L, 0);
taos_init();
lua_newtable(L);
int table_index = lua_gettop(L);
......@@ -125,7 +126,7 @@ static int l_query(lua_State *L){
//printf("row index:%d\n",rows);
rows++;
lua_pushnumber(L,rows);
lua_pushnumber(L, rows);
lua_newtable(L);
for (int i = 0; i < num_fields; ++i) {
......@@ -136,15 +137,19 @@ static int l_query(lua_State *L){
lua_pushstring(L,fields[i].name);
int32_t* length = taos_fetch_lengths(result);
switch (fields[i].type) {
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i]));
break;
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i]));
break;
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i]));
break;
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
......@@ -154,6 +159,7 @@ static int l_query(lua_State *L){
case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i]));
break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
//printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]);
......@@ -197,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){
printf("failed, reason:%s\n", taos_errstr(result));
lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L,"something is wrong");// taos_errstr(taos));
lua_pushstring(L, taos_errstr(result));
lua_setfield(L, table_index, "error");
}else{
//printf("success to async query.\n");
......@@ -214,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){
static int l_async_query(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
// int stime = luaL_checknumber(L,3);
TAOS * taos = (TAOS*)lua_topointer(L, 1);
const char * sqlstr = lua_tostring(L, 2);
// int stime = luaL_checknumber(L, 3);
lua_newtable(L);
int table_index = lua_gettop(L);
......@@ -224,7 +230,7 @@ static int l_async_query(lua_State *L){
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
// printf("r:%d, L:%d\n", r, L);
taos_query_a(taos,sqlstr,async_query_callback,p);
lua_pushnumber(L, 0);
......@@ -267,7 +273,7 @@ static const struct luaL_Reg lib[] = {
extern int luaopen_luaconnector51(lua_State* L)
{
// luaL_register(L, "luaconnector51", lib);
lua_newtable (L);
lua_newtable(L);
luaL_setfuncs(L,lib,0);
return 1;
}
......@@ -29,7 +29,7 @@ static int l_connect(lua_State *L){
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L, 1,"host");
if (lua_isstring(L,-1)){
if (lua_isstring(L, -1)){
host = lua_tostring(L, -1);
// printf("host = %s\n", host);
}
......@@ -58,7 +58,7 @@ static int l_connect(lua_State *L){
//printf("password = %s\n", password);
}
lua_settop(L,0);
lua_settop(L, 0);
taos_init();
......@@ -126,7 +126,7 @@ static int l_query(lua_State *L){
//printf("row index:%d\n",rows);
rows++;
lua_pushnumber(L,rows);
lua_pushnumber(L, rows);
lua_newtable(L);
for (int i = 0; i < num_fields; ++i) {
......@@ -203,7 +203,7 @@ void async_query_callback(void *param, TAOS_RES *result, int code){
printf("failed, reason:%s\n", taos_errstr(result));
lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L,"something is wrong");// taos_errstr(taos));
lua_pushstring(L, taos_errstr(result));
lua_setfield(L, table_index, "error");
}else{
//printf("success to async query.\n");
......@@ -220,9 +220,9 @@ void async_query_callback(void *param, TAOS_RES *result, int code){
static int l_async_query(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
// int stime = luaL_checknumber(L,3);
TAOS * taos = (TAOS*)lua_topointer(L, 1);
const char * sqlstr = lua_tostring(L, 2);
// int stime = luaL_checknumber(L, 3);
lua_newtable(L);
int table_index = lua_gettop(L);
......@@ -230,7 +230,7 @@ static int l_async_query(lua_State *L){
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
// printf("r:%d, L:%d\n", r, L);
taos_query_a(taos,sqlstr,async_query_callback,p);
lua_pushnumber(L, 0);
......
......@@ -176,8 +176,14 @@ end
driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)")
if res.code ~=0 then
print("create stream--- failed:"..res.error)
return
else
print("create stream--- pass")
end
print("From now on we start continous insert in an definite loop, pls wait for about 10 seconds and check stream table for result.")
print("From now on we start continous insertion in an definite loop, please wait for about 10 seconds and check stream table avg_degree for result.")
local loop_index = 0
while loop_index < 10 do
local t = os.time()*1000
......@@ -193,5 +199,5 @@ while loop_index < 10 do
os.execute("sleep " .. 1)
loop_index = loop_index + 1
end
driver.query(conn,"DROP STREAM IF EXISTS avg_therm_s")
driver.query(conn,"DROP STREAM IF EXISTS stream_avg_degree")
driver.close(conn)
......@@ -85,6 +85,8 @@ _err:
int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) {
taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
tsdbMemTableDestroy((*pTsdb)->mem);
(*pTsdb)->mem = NULL;
tsdbFSClose(*pTsdb);
tsdbCloseCache(*pTsdb);
taosMemoryFreeClear(*pTsdb);
......
......@@ -486,7 +486,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = 4096;
pReader->capacity = capacity;
pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
......@@ -841,14 +841,18 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1;
if (asc && pReader->window.skey <= pBlock->minKey.ts) {
pDumpInfo->rowIndex = 0;
} else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
pDumpInfo->rowIndex = pBlock->nRow - 1;
} else {
int32_t pos = asc ? pBlock->nRow - 1 : 0;
int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order);
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
if (asc && pReader->window.skey <= pBlock->minKey.ts) {
//pDumpInfo->rowIndex = 0;
} else
if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
//pDumpInfo->rowIndex = pBlock->nRow - 1;
} else {
int32_t pos = asc ? pBlock->nRow - 1 : 0;
int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order);
}
}
// time window check
......@@ -932,8 +936,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
pDumpInfo->rowIndex += step * remain;
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
setBlockAllDumped(pDumpInfo, ts, pReader->order);
// int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
// setBlockAllDumped(pDumpInfo, ts, pReader->order);
} else {
int64_t k = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
setBlockAllDumped(pDumpInfo, k, pReader->order);
......
......@@ -53,6 +53,10 @@ int vnodeCloseBufPool(SVnode *pVnode) {
vnodeBufPoolDestroy(pPool);
}
if (pVnode->inUse) {
vnodeBufPoolDestroy(pVnode->inUse);
pVnode->inUse = NULL;
}
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
return 0;
......@@ -177,4 +181,4 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
taosThreadMutexUnlock(&pVnode->mutex);
}
}
\ No newline at end of file
}
......@@ -161,7 +161,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open sync
if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -174,6 +173,7 @@ _err:
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(pVnode->pMeta);
if (pVnode->pPool) vnodeCloseBufPool(pVnode);
tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode);
......
......@@ -240,7 +240,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg);
vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
terrno = TSDB_CODE_APP_NOT_READY;
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
rpcFreeCont(pMsg->pCont);
......@@ -797,6 +797,12 @@ bool vnodeIsLeader(SVnode *pVnode) {
}
bool vnodeIsReadyForRead(SVnode *pVnode) {
if (!pVnode->restored) {
vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId);
terrno = TSDB_CODE_APP_NOT_READY;
return false;
}
if (syncIsReady(pVnode->sync)) {
return true;
}
......
......@@ -1656,6 +1656,8 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) {
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
taosMemoryFree(task->session);
taosMemoryFree(task);
return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
}
......
......@@ -960,7 +960,7 @@ int32_t udfdInitResidentFuncs() {
char* token;
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
char func[TSDB_FUNC_NAME_LEN] = {0};
strncpy(func, token, strlen(token));
strncpy(func, token, sizeof(func));
taosArrayPush(global.residentFuncs, func);
}
......
......@@ -32,7 +32,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port);
void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet);
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
bool syncUtilEmptyId(const SRaftId* pId);
......
......@@ -20,7 +20,10 @@
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
ASSERT(pSyncIndexMgr != NULL);
if (pSyncIndexMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));
pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
......@@ -248,4 +251,4 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI
}
ASSERT(0);
return -1;
}
\ No newline at end of file
}
......@@ -51,15 +51,17 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
// life cycle
static void syncFreeNode(void* param);
// ---------------------------------
static void syncNodeFreeCb(void *param) {
syncNodeClose(param);
param = NULL;
}
int32_t syncInit() {
int32_t ret = 0;
if (!syncEnvIsStart()) {
tsNodeRefId = taosOpenRef(200, syncFreeNode);
tsNodeRefId = taosOpenRef(200, syncNodeFreeCb);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
......@@ -86,11 +88,15 @@ void syncCleanUp() {
int64_t syncOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
ASSERT(pSyncNode != NULL);
if (pSyncNode == NULL) {
sError("failed to open sync node. vgId:%d", pSyncInfo->vgId);
return -1;
}
pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
if (pSyncNode->rid < 0) {
syncFreeNode(pSyncNode);
syncNodeClose(pSyncNode);
pSyncNode = NULL;
return -1;
}
......@@ -136,11 +142,9 @@ void syncStartStandBy(int64_t rid) {
void syncStop(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) return;
int32_t vgId = pSyncNode->vgId;
syncNodeClose(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
taosRemoveRef(tsNodeRefId, rid);
sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId);
}
......@@ -210,7 +214,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("syncNodeCheckNewConfig error");
sError("invalid new config. vgId:%d", pSyncNode->vgId);
return -1;
}
......@@ -237,7 +241,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("syncNodeCheckNewConfig error");
sError("invalid new config. vgId:%d", pSyncNode->vgId);
return -1;
}
......@@ -941,16 +945,18 @@ _END:
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
ASSERT(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode));
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode));
if (pSyncNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
int32_t ret = 0;
if (!taosDirExist((char*)(pSyncInfo->path))) {
if (taosMkDir(pSyncInfo->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
return NULL;
goto _error;
}
}
......@@ -963,15 +969,21 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
meta.lastConfigIndex = SYNC_INDEX_INVALID;
meta.batchSize = pSyncInfo->batchSize;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0);
if (ret != 0) {
sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath);
goto _error;
}
} else {
// update syncCfg by raft_config.json
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
ASSERT(pSyncNode->pRaftCfg != NULL);
if (pSyncNode->pRaftCfg == NULL) {
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath);
goto _error;
}
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
raftCfgClose(pSyncNode->pRaftCfg);
pSyncNode->pRaftCfg = NULL;
}
// init by SSyncInfo
......@@ -988,11 +1000,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init raft config
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
ASSERT(pSyncNode->pRaftCfg != NULL);
if (pSyncNode->pRaftCfg == NULL) {
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath);
goto _error;
}
// init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
......@@ -1004,17 +1022,24 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
}
}
for (int i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i);
goto _error;
}
}
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
if(!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i);
goto _error;
}
}
// init raft algorithm
pSyncNode->pFsm = pSyncInfo->pFsm;
pSyncInfo->pFsm = NULL;
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
pSyncNode->leaderCache = EMPTY_RAFT_ID;
......@@ -1047,29 +1072,50 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
ASSERT(pSyncNode->pRaftStore != NULL);
if (pSyncNode->pRaftStore == NULL) {
sError("failed to open raft store. path: %s", pSyncNode->raftStorePath);
goto _error;
}
// init TLA+ candidate vars
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
ASSERT(pSyncNode->pVotesGranted != NULL);
if (pSyncNode->pVotesGranted == NULL) {
sError("failed to create VotesGranted. vgId:%d", pSyncNode->vgId);
goto _error;
}
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
ASSERT(pSyncNode->pVotesRespond != NULL);
if (pSyncNode->pVotesRespond == NULL) {
sError("failed to create VotesRespond. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init TLA+ leader vars
pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
ASSERT(pSyncNode->pNextIndex != NULL);
if (pSyncNode->pNextIndex == NULL) {
sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
ASSERT(pSyncNode->pMatchIndex != NULL);
if (pSyncNode->pMatchIndex == NULL) {
sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init TLA+ log vars
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
ASSERT(pSyncNode->pLogStore != NULL);
if (pSyncNode->pLogStore == NULL) {
sError("failed to create SyncLogStore. vgId:%d", pSyncNode->vgId);
goto _error;
}
SyncIndex commitIndex = SYNC_INDEX_INVALID;
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0};
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
ASSERT(code == 0);
if (code != 0) {
sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code);
goto _error;
}
if (snapshot.lastApplyIndex > commitIndex) {
commitIndex = snapshot.lastApplyIndex;
syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
......@@ -1132,7 +1178,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// tools
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
ASSERT(pSyncNode->pSyncRespMgr != NULL);
if (pSyncNode->pSyncRespMgr == NULL) {
sError("failed to create SyncRespMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
// restore state
pSyncNode->restoreFinish = false;
......@@ -1162,6 +1211,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
syncNodeEventLog(pSyncNode, "sync open");
return pSyncNode;
_error:
if (pSyncInfo->pFsm) {
taosMemoryFree(pSyncInfo->pFsm);
pSyncInfo->pFsm = NULL;
}
syncNodeClose(pSyncNode);
pSyncNode = NULL;
return NULL;
}
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
......@@ -1214,20 +1272,28 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "sync close");
if (pSyncNode == NULL) {
return;
}
int32_t ret;
ASSERT(pSyncNode != NULL);
ret = raftStoreClose(pSyncNode->pRaftStore);
ASSERT(ret == 0);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
pSyncNode->pSyncRespMgr = NULL;
voteGrantedDestroy(pSyncNode->pVotesGranted);
pSyncNode->pVotesGranted = NULL;
votesRespondDestory(pSyncNode->pVotesRespond);
pSyncNode->pVotesRespond = NULL;
syncIndexMgrDestroy(pSyncNode->pNextIndex);
pSyncNode->pNextIndex = NULL;
syncIndexMgrDestroy(pSyncNode->pMatchIndex);
pSyncNode->pMatchIndex = NULL;
logStoreDestory(pSyncNode->pLogStore);
pSyncNode->pLogStore = NULL;
raftCfgClose(pSyncNode->pRaftCfg);
pSyncNode->pRaftCfg = NULL;
syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode);
......@@ -1249,8 +1315,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode->pNewNodeReceiver = NULL;
}
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
taosMemoryFree(pSyncNode);
}
// option
......@@ -2534,7 +2599,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return;
}
} else {
sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL");
sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId);
}
syncTimeoutDestroy(pSyncMsg);
......@@ -2774,14 +2839,6 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
return 0;
}
static void syncFreeNode(void* param) {
SSyncNode* pNode = param;
// inner object already free
// syncNodePrint2((char*)"==syncFreeNode==", pNode);
taosMemoryFree(pNode);
}
const char* syncStr(ESyncState state) {
switch (state) {
case TAOS_SYNC_STATE_FOLLOWER:
......
......@@ -364,8 +364,6 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
int32_t sysErr = errno;
const char *sysErrStr = strerror(errno);
sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr);
ASSERT(0);
return -1;
}
......
......@@ -28,7 +28,7 @@ SRaftStore *raftStoreOpen(const char *path) {
SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore));
if (pRaftStore == NULL) {
sError("raftStoreOpen malloc error");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pRaftStore, 0, sizeof(*pRaftStore));
......@@ -72,7 +72,9 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) {
}
int32_t raftStoreClose(SRaftStore *pRaftStore) {
ASSERT(pRaftStore != NULL);
if (pRaftStore == NULL) {
return 0;
}
taosCloseFile(&pRaftStore->pFile);
taosMemoryFree(pRaftStore);
......
......@@ -19,6 +19,10 @@
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
if (pObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pObj, 0, sizeof(SSyncRespMgr));
pObj->pRespHash =
......
......@@ -35,7 +35,10 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
SSyncSnapshotSender *pSender = NULL;
if (condition) {
pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
ASSERT(pSender != NULL);
if (pSender == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pSender, 0, sizeof(*pSender));
pSender->start = false;
......
......@@ -26,7 +26,8 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
uint32_t hostU32 = taosGetIpv4FromFqdn(host);
if (hostU32 == (uint32_t)-1) {
sError("Get IP address error");
sError("failed to resolve ipv4 addr. host:%s", host);
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
......@@ -84,13 +85,18 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
addEpIntoEpSet(pEpSet, host, port);
}
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) {
bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn);
ASSERT(ipv4 != 0xFFFFFFFF);
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
sError("failed to resolve ipv4 addr. fqdn: %s", pNodeInfo->nodeFqdn);
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return false;
}
char ipbuf[128] = {0};
tinet_ntoa(ipbuf, ipv4);
raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort);
raftId->vgId = vgId;
return true;
}
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
......@@ -310,4 +316,4 @@ void syncUtilJson2Line(char* jsonStr) {
q++;
}
}
}
\ No newline at end of file
}
......@@ -24,7 +24,10 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted));
ASSERT(pVotesGranted != NULL);
if (pVotesGranted == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->replicas = &(pSyncNode->replicasId);
......
......@@ -879,7 +879,7 @@ int sml_16960_Test() {
"{"
"\"timestamp\":"
""
"{ \"value\": 1349020800000, \"type\": \"ms\" }"
"{ \"value\": 1664418955000, \"type\": \"ms\" }"
","
"\"value\":"
""
......@@ -916,7 +916,7 @@ int sml_16960_Test() {
"{"
"\"timestamp\":"
""
"{ \"value\": 1349020800001, \"type\": \"ms\" }"
"{ \"value\": 1664418955001, \"type\": \"ms\" }"
","
"\"value\":"
""
......@@ -953,7 +953,7 @@ int sml_16960_Test() {
"{"
"\"timestamp\":"
""
"{ \"value\": 1349020800002, \"type\": \"ms\" }"
"{ \"value\": 1664418955002, \"type\": \"ms\" }"
","
"\"value\":"
""
......@@ -990,7 +990,7 @@ int sml_16960_Test() {
"{"
"\"timestamp\":"
""
"{ \"value\": 1349020800003, \"type\": \"ms\" }"
"{ \"value\": 1664418955003, \"type\": \"ms\" }"
","
"\"value\":"
""
......@@ -1027,7 +1027,7 @@ int sml_16960_Test() {
"{"
"\"timestamp\":"
""
"{ \"value\": 1349020800004, \"type\": \"ms\" }"
"{ \"value\": 1664418955004, \"type\": \"ms\" }"
","
"\"value\":"
""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册