未验证 提交 ea3aae75 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4298 from taosdata/feature/wal

Feature/wal
......@@ -24,18 +24,18 @@ extern "C" {
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef enum _TAOS_SYNC_ROLE {
TAOS_SYNC_ROLE_OFFLINE,
TAOS_SYNC_ROLE_UNSYNCED,
TAOS_SYNC_ROLE_SYNCING,
TAOS_SYNC_ROLE_SLAVE,
TAOS_SYNC_ROLE_MASTER,
TAOS_SYNC_ROLE_OFFLINE = 0,
TAOS_SYNC_ROLE_UNSYNCED = 1,
TAOS_SYNC_ROLE_SYNCING = 2,
TAOS_SYNC_ROLE_SLAVE = 3,
TAOS_SYNC_ROLE_MASTER = 4
} ESyncRole;
typedef enum _TAOS_SYNC_STATUS {
TAOS_SYNC_STATUS_INIT,
TAOS_SYNC_STATUS_START,
TAOS_SYNC_STATUS_FILE,
TAOS_SYNC_STATUS_CACHE,
TAOS_SYNC_STATUS_INIT = 0,
TAOS_SYNC_STATUS_START = 1,
TAOS_SYNC_STATUS_FILE = 2,
TAOS_SYNC_STATUS_CACHE = 3
} ESyncStatus;
typedef struct {
......
......@@ -20,7 +20,8 @@
extern "C" {
#endif
struct SMnodeMsg;
#include "mnode.h"
#include "twal.h"
typedef enum {
SDB_TABLE_CLUSTER = 0,
......@@ -36,44 +37,46 @@ typedef enum {
} ESdbTable;
typedef enum {
SDB_KEY_STRING,
SDB_KEY_INT,
SDB_KEY_AUTO,
SDB_KEY_VAR_STRING,
SDB_KEY_STRING = 0,
SDB_KEY_INT = 1,
SDB_KEY_AUTO = 2,
SDB_KEY_VAR_STRING = 3,
} ESdbKey;
typedef enum {
SDB_OPER_GLOBAL,
SDB_OPER_LOCAL
SDB_OPER_GLOBAL = 0,
SDB_OPER_LOCAL = 1
} ESdbOper;
typedef struct SSdbOper {
ESdbOper type;
int32_t rowSize;
int32_t retCode; // for callback in sdb queue
int32_t processedCount; // for sync fwd callback
int32_t (*reqFp)(struct SMnodeMsg *pMsg);
int32_t (*writeCb)(struct SMnodeMsg *pMsg, int32_t code);
void * table;
void * pObj;
void * rowData;
struct SMnodeMsg *pMsg;
} SSdbOper;
typedef struct SSdbRow {
ESdbOper type;
int32_t processedCount; // for sync fwd callback
int32_t code; // for callback in sdb queue
int32_t rowSize;
void * rowData;
void * pObj;
void * pTable;
SMnodeMsg *pMsg;
int32_t (*fpReq)(SMnodeMsg *pMsg);
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
char reserveForSync[16];
SWalHead pHead[];
} SSdbRow;
typedef struct {
char *tableName;
int32_t hashSessions;
int32_t maxRowSize;
int32_t refCountPos;
ESdbTable tableId;
char * name;
int32_t hashSessions;
int32_t maxRowSize;
int32_t refCountPos;
ESdbTable id;
ESdbKey keyType;
int32_t (*insertFp)(SSdbOper *pOper);
int32_t (*deleteFp)(SSdbOper *pOper);
int32_t (*updateFp)(SSdbOper *pOper);
int32_t (*encodeFp)(SSdbOper *pOper);
int32_t (*decodeFp)(SSdbOper *pDesc);
int32_t (*destroyFp)(SSdbOper *pDesc);
int32_t (*restoredFp)();
int32_t (*fpInsert)(SSdbRow *pRow);
int32_t (*fpDelete)(SSdbRow *pRow);
int32_t (*fpUpdate)(SSdbRow *pRow);
int32_t (*fpEncode)(SSdbRow *pRow);
int32_t (*fpDecode)(SSdbRow *pRow);
int32_t (*fpDestroy)(SSdbRow *pRow);
int32_t (*fpRestored)();
} SSdbTableDesc;
int32_t sdbInit();
......@@ -84,20 +87,20 @@ bool sdbIsMaster();
bool sdbIsServing();
void sdbUpdateMnodeRoles();
int32_t sdbInsertRow(SSdbOper *pOper);
int32_t sdbDeleteRow(SSdbOper *pOper);
int32_t sdbUpdateRow(SSdbOper *pOper);
int32_t sdbInsertRowImp(SSdbOper *pOper);
int32_t sdbInsertRow(SSdbRow *pRow);
int32_t sdbDeleteRow(SSdbRow *pRow);
int32_t sdbUpdateRow(SSdbRow *pRow);
int32_t sdbInsertRowToQueue(SSdbRow *pRow);
void *sdbGetRow(void *handle, void *key);
void *sdbFetchRow(void *handle, void *pIter, void **ppRow);
void *sdbGetRow(void *pTable, void *key);
void *sdbFetchRow(void *pTable, void *pIter, void **ppRow);
void sdbFreeIter(void *pIter);
void sdbIncRef(void *thandle, void *pRow);
void sdbDecRef(void *thandle, void *pRow);
int64_t sdbGetNumOfRows(void *handle);
int32_t sdbGetId(void *handle);
void sdbIncRef(void *pTable, void *pRow);
void sdbDecRef(void *pTable, void *pRow);
int64_t sdbGetNumOfRows(void *pTable);
int32_t sdbGetId(void *pTable);
uint64_t sdbGetVersion();
bool sdbCheckRowDeleted(void *thandle, void *pRow);
bool sdbCheckRowDeleted(void *pTable, void *pRow);
#ifdef __cplusplus
}
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "dnode.h"
#include "mnodeDef.h"
#include "mnodeInt.h"
......@@ -25,36 +26,34 @@
#include "mnodeUser.h"
#include "mnodeVgroup.h"
#include "tglobal.h"
void * tsAcctSdb = NULL;
static int32_t tsAcctUpdateSize;
static int32_t mnodeCreateRootAcct();
static int32_t mnodeAcctActionDestroy(SSdbOper *pOper) {
SAcctObj *pAcct = pOper->pObj;
static int32_t mnodeAcctActionDestroy(SSdbRow *pRow) {
SAcctObj *pAcct = pRow->pObj;
pthread_mutex_destroy(&pAcct->mutex);
tfree(pOper->pObj);
tfree(pRow->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeAcctActionInsert(SSdbOper *pOper) {
SAcctObj *pAcct = pOper->pObj;
static int32_t mnodeAcctActionInsert(SSdbRow *pRow) {
SAcctObj *pAcct = pRow->pObj;
memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo));
pAcct->acctInfo.accessState = TSDB_VN_ALL_ACCCESS;
pthread_mutex_init(&pAcct->mutex, NULL);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeAcctActionDelete(SSdbOper *pOper) {
SAcctObj *pAcct = pOper->pObj;
static int32_t mnodeAcctActionDelete(SSdbRow *pRow) {
SAcctObj *pAcct = pRow->pObj;
mnodeDropAllUsers(pAcct);
mnodeDropAllDbs(pAcct);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeAcctActionUpdate(SSdbOper *pOper) {
SAcctObj *pAcct = pOper->pObj;
static int32_t mnodeAcctActionUpdate(SSdbRow *pRow) {
SAcctObj *pAcct = pRow->pObj;
SAcctObj *pSaved = mnodeGetAcct(pAcct->user);
if (pAcct != pSaved) {
memcpy(pSaved, pAcct, tsAcctUpdateSize);
......@@ -64,19 +63,19 @@ static int32_t mnodeAcctActionUpdate(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeAcctActionEncode(SSdbOper *pOper) {
SAcctObj *pAcct = pOper->pObj;
memcpy(pOper->rowData, pAcct, tsAcctUpdateSize);
pOper->rowSize = tsAcctUpdateSize;
static int32_t mnodeAcctActionEncode(SSdbRow *pRow) {
SAcctObj *pAcct = pRow->pObj;
memcpy(pRow->rowData, pAcct, tsAcctUpdateSize);
pRow->rowSize = tsAcctUpdateSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeAcctActionDecode(SSdbOper *pOper) {
static int32_t mnodeAcctActionDecode(SSdbRow *pRow) {
SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj));
if (pAcct == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pAcct, pOper->rowData, tsAcctUpdateSize);
pOper->pObj = pAcct;
memcpy(pAcct, pRow->rowData, tsAcctUpdateSize);
pRow->pObj = pAcct;
return TSDB_CODE_SUCCESS;
}
......@@ -99,29 +98,29 @@ int32_t mnodeInitAccts() {
SAcctObj tObj;
tsAcctUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_ACCOUNT,
.tableName = "accounts",
SSdbTableDesc desc = {
.id = SDB_TABLE_ACCOUNT,
.name = "accounts",
.hashSessions = TSDB_DEFAULT_ACCOUNTS_HASH_SIZE,
.maxRowSize = tsAcctUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_STRING,
.insertFp = mnodeAcctActionInsert,
.deleteFp = mnodeAcctActionDelete,
.updateFp = mnodeAcctActionUpdate,
.encodeFp = mnodeAcctActionEncode,
.decodeFp = mnodeAcctActionDecode,
.destroyFp = mnodeAcctActionDestroy,
.restoredFp = mnodeAcctActionRestored
.fpInsert = mnodeAcctActionInsert,
.fpDelete = mnodeAcctActionDelete,
.fpUpdate = mnodeAcctActionUpdate,
.fpEncode = mnodeAcctActionEncode,
.fpDecode = mnodeAcctActionDecode,
.fpDestroy = mnodeAcctActionDestroy,
.fpRestored = mnodeAcctActionRestored
};
tsAcctSdb = sdbOpenTable(&tableDesc);
tsAcctSdb = sdbOpenTable(&desc);
if (tsAcctSdb == NULL) {
mError("table:%s, failed to create hash", tableDesc.tableName);
mError("table:%s, failed to create hash", desc.name);
return -1;
}
mDebug("table:%s, hash is created", tableDesc.tableName);
mDebug("table:%s, hash is created", desc.name);
return TSDB_CODE_SUCCESS;
}
......@@ -226,13 +225,13 @@ static int32_t mnodeCreateRootAcct() {
pAcct->acctId = sdbGetId(tsAcctSdb);
pAcct->createdTime = taosGetTimestampMs();
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsAcctSdb,
.pObj = pAcct,
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsAcctSdb,
.pObj = pAcct,
};
return sdbInsertRow(&oper);
return sdbInsertRow(&row);
}
#ifndef _ACCT
......
......@@ -32,36 +32,36 @@ static int32_t mnodeCreateCluster();
static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeClusterActionDestroy(SSdbOper *pOper) {
tfree(pOper->pObj);
static int32_t mnodeClusterActionDestroy(SSdbRow *pRow) {
tfree(pRow->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeClusterActionInsert(SSdbOper *pOper) {
static int32_t mnodeClusterActionInsert(SSdbRow *pRow) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeClusterActionDelete(SSdbOper *pOper) {
static int32_t mnodeClusterActionDelete(SSdbRow *pRow) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeClusterActionUpdate(SSdbOper *pOper) {
static int32_t mnodeClusterActionUpdate(SSdbRow *pRow) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeClusterActionEncode(SSdbOper *pOper) {
SClusterObj *pCluster = pOper->pObj;
memcpy(pOper->rowData, pCluster, tsClusterUpdateSize);
pOper->rowSize = tsClusterUpdateSize;
static int32_t mnodeClusterActionEncode(SSdbRow *pRow) {
SClusterObj *pCluster = pRow->pObj;
memcpy(pRow->rowData, pCluster, tsClusterUpdateSize);
pRow->rowSize = tsClusterUpdateSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeClusterActionDecode(SSdbOper *pOper) {
static int32_t mnodeClusterActionDecode(SSdbRow *pRow) {
SClusterObj *pCluster = (SClusterObj *) calloc(1, sizeof(SClusterObj));
if (pCluster == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pCluster, pOper->rowData, tsClusterUpdateSize);
pOper->pObj = pCluster;
memcpy(pCluster, pRow->rowData, tsClusterUpdateSize);
pRow->pObj = pCluster;
return TSDB_CODE_SUCCESS;
}
......@@ -84,32 +84,32 @@ int32_t mnodeInitCluster() {
SClusterObj tObj;
tsClusterUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_CLUSTER,
.tableName = "cluster",
SSdbTableDesc desc = {
.id = SDB_TABLE_CLUSTER,
.name = "cluster",
.hashSessions = TSDB_DEFAULT_CLUSTER_HASH_SIZE,
.maxRowSize = tsClusterUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_STRING,
.insertFp = mnodeClusterActionInsert,
.deleteFp = mnodeClusterActionDelete,
.updateFp = mnodeClusterActionUpdate,
.encodeFp = mnodeClusterActionEncode,
.decodeFp = mnodeClusterActionDecode,
.destroyFp = mnodeClusterActionDestroy,
.restoredFp = mnodeClusterActionRestored
.fpInsert = mnodeClusterActionInsert,
.fpDelete = mnodeClusterActionDelete,
.fpUpdate = mnodeClusterActionUpdate,
.fpEncode = mnodeClusterActionEncode,
.fpDecode = mnodeClusterActionDecode,
.fpDestroy = mnodeClusterActionDestroy,
.fpRestored = mnodeClusterActionRestored
};
tsClusterSdb = sdbOpenTable(&tableDesc);
tsClusterSdb = sdbOpenTable(&desc);
if (tsClusterSdb == NULL) {
mError("table:%s, failed to create hash", tableDesc.tableName);
mError("table:%s, failed to create hash", desc.name);
return -1;
}
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters);
mDebug("table:%s, hash is created", tableDesc.tableName);
mDebug("table:%s, hash is created", desc.name);
return TSDB_CODE_SUCCESS;
}
......@@ -145,13 +145,13 @@ static int32_t mnodeCreateCluster() {
mDebug("uid is %s", pCluster->uid);
}
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsClusterSdb,
.pObj = pCluster,
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsClusterSdb,
.pObj = pCluster,
};
return sdbInsertRow(&oper);
return sdbInsertRow(&row);
}
const char* mnodeGetClusterId() {
......
......@@ -56,8 +56,8 @@ static void mnodeDestroyDb(SDbObj *pDb) {
tfree(pDb);
}
static int32_t mnodeDbActionDestroy(SSdbOper *pOper) {
mnodeDestroyDb(pOper->pObj);
static int32_t mnodeDbActionDestroy(SSdbRow *pRow) {
mnodeDestroyDb(pRow->pObj);
return TSDB_CODE_SUCCESS;
}
......@@ -65,8 +65,8 @@ int64_t mnodeGetDbNum() {
return sdbGetNumOfRows(tsDbSdb);
}
static int32_t mnodeDbActionInsert(SSdbOper *pOper) {
SDbObj *pDb = pOper->pObj;
static int32_t mnodeDbActionInsert(SSdbRow *pRow) {
SDbObj *pDb = pRow->pObj;
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
pthread_mutex_init(&pDb->mutex, NULL);
......@@ -91,8 +91,8 @@ static int32_t mnodeDbActionInsert(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
SDbObj *pDb = pOper->pObj;
static int32_t mnodeDbActionDelete(SSdbRow *pRow) {
SDbObj *pDb = pRow->pObj;
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
mnodeDropAllChildTables(pDb);
......@@ -107,11 +107,11 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDbActionUpdate(SSdbOper *pOper) {
SDbObj *pNew = pOper->pObj;
static int32_t mnodeDbActionUpdate(SSdbRow *pRow) {
SDbObj *pNew = pRow->pObj;
SDbObj *pDb = mnodeGetDb(pNew->name);
if (pDb != NULL && pNew != pDb) {
memcpy(pDb, pNew, pOper->rowSize);
memcpy(pDb, pNew, pRow->rowSize);
free(pNew->vgList);
free(pNew);
}
......@@ -120,19 +120,19 @@ static int32_t mnodeDbActionUpdate(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDbActionEncode(SSdbOper *pOper) {
SDbObj *pDb = pOper->pObj;
memcpy(pOper->rowData, pDb, tsDbUpdateSize);
pOper->rowSize = tsDbUpdateSize;
static int32_t mnodeDbActionEncode(SSdbRow *pRow) {
SDbObj *pDb = pRow->pObj;
memcpy(pRow->rowData, pDb, tsDbUpdateSize);
pRow->rowSize = tsDbUpdateSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDbActionDecode(SSdbOper *pOper) {
static int32_t mnodeDbActionDecode(SSdbRow *pRow) {
SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj));
if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pDb, pOper->rowData, tsDbUpdateSize);
pOper->pObj = pDb;
memcpy(pDb, pRow->rowData, tsDbUpdateSize);
pRow->pObj = pDb;
return TSDB_CODE_SUCCESS;
}
......@@ -144,23 +144,23 @@ int32_t mnodeInitDbs() {
SDbObj tObj;
tsDbUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_DB,
.tableName = "dbs",
SSdbTableDesc desc = {
.id = SDB_TABLE_DB,
.name = "dbs",
.hashSessions = TSDB_DEFAULT_DBS_HASH_SIZE,
.maxRowSize = tsDbUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_STRING,
.insertFp = mnodeDbActionInsert,
.deleteFp = mnodeDbActionDelete,
.updateFp = mnodeDbActionUpdate,
.encodeFp = mnodeDbActionEncode,
.decodeFp = mnodeDbActionDecode,
.destroyFp = mnodeDbActionDestroy,
.restoredFp = mnodeDbActionRestored
.fpInsert = mnodeDbActionInsert,
.fpDelete = mnodeDbActionDelete,
.fpUpdate = mnodeDbActionUpdate,
.fpEncode = mnodeDbActionEncode,
.fpDecode = mnodeDbActionDecode,
.fpDestroy = mnodeDbActionDestroy,
.fpRestored = mnodeDbActionRestored
};
tsDbSdb = sdbOpenTable(&tableDesc);
tsDbSdb = sdbOpenTable(&desc);
if (tsDbSdb == NULL) {
mError("failed to init db data");
return -1;
......@@ -412,16 +412,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
pMsg->pDb = pDb;
mnodeIncDbRef(pDb);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb,
.rowSize = sizeof(SDbObj),
.pMsg = pMsg,
.writeCb = mnodeCreateDbCb
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDbSdb,
.pObj = pDb,
.rowSize = sizeof(SDbObj),
.pMsg = pMsg,
.fpRsp = mnodeCreateDbCb
};
code = sdbInsertRow(&oper);
code = sdbInsertRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code));
pMsg->pDb = NULL;
......@@ -440,8 +440,8 @@ bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
}
#if 0
void mnodePrintVgroups(SDbObj *pDb, char *oper) {
mInfo("db:%s, vgroup link from head, oper:%s", pDb->name, oper);
void mnodePrintVgroups(SDbObj *pDb, char *row) {
mInfo("db:%s, vgroup link from head, row:%s", pDb->name, row);
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
mInfo("vgId:%d", pVgroup->vgId);
......@@ -807,13 +807,13 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
if (pDb->status) return TSDB_CODE_SUCCESS;
pDb->status = true;
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDbSdb,
.pObj = pDb
};
int32_t code = sdbUpdateRow(&oper);
int32_t code = sdbUpdateRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code));
}
......@@ -1019,15 +1019,15 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg;
pDb->cfgVersion++;
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb,
.pMsg = pMsg,
.writeCb = mnodeAlterDbCb
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDbSdb,
.pObj = pDb,
.pMsg = pMsg,
.fpRsp = mnodeAlterDbCb
};
code = sdbUpdateRow(&oper);
code = sdbUpdateRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code));
}
......@@ -1071,15 +1071,15 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
SDbObj *pDb = pMsg->pDb;
mInfo("db:%s, drop db from sdb", pDb->name);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb,
.pMsg = pMsg,
.writeCb = mnodeDropDbCb
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDbSdb,
.pObj = pDb,
.pMsg = pMsg,
.fpRsp = mnodeDropDbCb
};
int32_t code = sdbDeleteRow(&oper);
int32_t code = sdbDeleteRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code));
}
......@@ -1134,13 +1134,13 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
if (pDb->pAcct == pAcct) {
mInfo("db:%s, drop db from sdb for acct:%s is dropped", pDb->name, pAcct->user);
SSdbOper oper = {
.type = SDB_OPER_LOCAL,
.table = tsDbSdb,
.pObj = pDb
SSdbRow row = {
.type = SDB_OPER_LOCAL,
.pTable = tsDbSdb,
.pObj = pDb
};
sdbDeleteRow(&oper);
sdbDeleteRow(&row);
numOfDbs++;
}
mnodeDecDbRef(pDb);
......
......@@ -87,13 +87,13 @@ static char* offlineReason[] = {
"unknown",
};
static int32_t mnodeDnodeActionDestroy(SSdbOper *pOper) {
tfree(pOper->pObj);
static int32_t mnodeDnodeActionDestroy(SSdbRow *pRow) {
tfree(pRow->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) {
SDnodeObj *pDnode = pOper->pObj;
static int32_t mnodeDnodeActionInsert(SSdbRow *pRow) {
SDnodeObj *pDnode = pRow->pObj;
if (pDnode->status != TAOS_DN_STATUS_DROPPING) {
pDnode->status = TAOS_DN_STATUS_OFFLINE;
pDnode->lastAccess = tsAccessSquence;
......@@ -107,8 +107,8 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
SDnodeObj *pDnode = pOper->pObj;
static int32_t mnodeDnodeActionDelete(SSdbRow *pRow) {
SDnodeObj *pDnode = pRow->pObj;
#ifndef _SYNC
mnodeDropAllDnodeVgroups(pDnode);
......@@ -121,11 +121,11 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
SDnodeObj *pNew = pOper->pObj;
static int32_t mnodeDnodeActionUpdate(SSdbRow *pRow) {
SDnodeObj *pNew = pRow->pObj;
SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId);
if (pDnode != NULL && pNew != pDnode) {
memcpy(pDnode, pNew, pOper->rowSize);
memcpy(pDnode, pNew, pRow->rowSize);
free(pNew);
}
mnodeDecDnodeRef(pDnode);
......@@ -134,19 +134,19 @@ static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDnodeActionEncode(SSdbOper *pOper) {
SDnodeObj *pDnode = pOper->pObj;
memcpy(pOper->rowData, pDnode, tsDnodeUpdateSize);
pOper->rowSize = tsDnodeUpdateSize;
static int32_t mnodeDnodeActionEncode(SSdbRow *pRow) {
SDnodeObj *pDnode = pRow->pObj;
memcpy(pRow->rowData, pDnode, tsDnodeUpdateSize);
pRow->rowSize = tsDnodeUpdateSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeDnodeActionDecode(SSdbOper *pOper) {
static int32_t mnodeDnodeActionDecode(SSdbRow *pRow) {
SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj));
if (pDnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pDnode, pOper->rowData, tsDnodeUpdateSize);
pOper->pObj = pDnode;
memcpy(pDnode, pRow->rowData, tsDnodeUpdateSize);
pRow->pObj = pDnode;
return TSDB_CODE_SUCCESS;
}
......@@ -171,23 +171,23 @@ int32_t mnodeInitDnodes() {
tsDnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
pthread_mutex_init(&tsDnodeEpsMutex, NULL);
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_DNODE,
.tableName = "dnodes",
SSdbTableDesc desc = {
.id = SDB_TABLE_DNODE,
.name = "dnodes",
.hashSessions = TSDB_DEFAULT_DNODES_HASH_SIZE,
.maxRowSize = tsDnodeUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_AUTO,
.insertFp = mnodeDnodeActionInsert,
.deleteFp = mnodeDnodeActionDelete,
.updateFp = mnodeDnodeActionUpdate,
.encodeFp = mnodeDnodeActionEncode,
.decodeFp = mnodeDnodeActionDecode,
.destroyFp = mnodeDnodeActionDestroy,
.restoredFp = mnodeDnodeActionRestored
.fpInsert = mnodeDnodeActionInsert,
.fpDelete = mnodeDnodeActionDelete,
.fpUpdate = mnodeDnodeActionUpdate,
.fpEncode = mnodeDnodeActionEncode,
.fpDecode = mnodeDnodeActionDecode,
.fpDestroy = mnodeDnodeActionDestroy,
.fpRestored = mnodeDnodeActionRestored
};
tsDnodeSdb = sdbOpenTable(&tableDesc);
tsDnodeSdb = sdbOpenTable(&desc);
if (tsDnodeSdb == NULL) {
mError("failed to init dnodes data");
return -1;
......@@ -296,13 +296,13 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode) {
}
void mnodeUpdateDnode(SDnodeObj *pDnode) {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsDnodeSdb,
.pObj = pDnode
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDnodeSdb,
.pObj = pDnode
};
int32_t code = sdbUpdateRow(&oper);
int32_t code = sdbUpdateRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnodeId:%d, failed update", pDnode->dnodeId);
}
......@@ -644,15 +644,15 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
tstrncpy(pDnode->dnodeEp, ep, TSDB_EP_LEN);
taosGetFqdnPortFromEp(ep, pDnode->dnodeFqdn, &pDnode->dnodePort);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsDnodeSdb,
.pObj = pDnode,
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDnodeSdb,
.pObj = pDnode,
.rowSize = sizeof(SDnodeObj),
.pMsg = pMsg
.pMsg = pMsg
};
int32_t code = sdbInsertRow(&oper);
int32_t code = sdbInsertRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
int dnodeId = pDnode->dnodeId;
tfree(pDnode);
......@@ -665,14 +665,14 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
}
int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsDnodeSdb,
.pObj = pDnode,
.pMsg = pMsg
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDnodeSdb,
.pObj = pDnode,
.pMsg = pMsg
};
int32_t code = sdbDeleteRow(&oper);
int32_t code = sdbDeleteRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code));
} else {
......@@ -1141,7 +1141,7 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, mnodeGetMnodeRoleStr(pVgid->role));
strcpy(pWrite, syncRole[pVgid->role]);
cols++;
}
}
......
......@@ -58,13 +58,13 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
#define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock)
#endif
static int32_t mnodeMnodeActionDestroy(SSdbOper *pOper) {
tfree(pOper->pObj);
static int32_t mnodeMnodeActionDestroy(SSdbRow *pRow) {
tfree(pRow->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) {
SMnodeObj *pMnode = pOper->pObj;
static int32_t mnodeMnodeActionInsert(SSdbRow *pRow) {
SMnodeObj *pMnode = pRow->pObj;
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST;
......@@ -76,8 +76,8 @@ static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeMnodeActionDelete(SSdbOper *pOper) {
SMnodeObj *pMnode = pOper->pObj;
static int32_t mnodeMnodeActionDelete(SSdbRow *pRow) {
SMnodeObj *pMnode = pRow->pObj;
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST;
......@@ -88,30 +88,30 @@ static int32_t mnodeMnodeActionDelete(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeMnodeActionUpdate(SSdbOper *pOper) {
SMnodeObj *pMnode = pOper->pObj;
static int32_t mnodeMnodeActionUpdate(SSdbRow *pRow) {
SMnodeObj *pMnode = pRow->pObj;
SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId);
if (pMnode != pSaved) {
memcpy(pSaved, pMnode, pOper->rowSize);
memcpy(pSaved, pMnode, pRow->rowSize);
free(pMnode);
}
mnodeDecMnodeRef(pSaved);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeMnodeActionEncode(SSdbOper *pOper) {
SMnodeObj *pMnode = pOper->pObj;
memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize);
pOper->rowSize = tsMnodeUpdateSize;
static int32_t mnodeMnodeActionEncode(SSdbRow *pRow) {
SMnodeObj *pMnode = pRow->pObj;
memcpy(pRow->rowData, pMnode, tsMnodeUpdateSize);
pRow->rowSize = tsMnodeUpdateSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeMnodeActionDecode(SSdbOper *pOper) {
static int32_t mnodeMnodeActionDecode(SSdbRow *pRow) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
if (pMnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pMnode, pOper->rowData, tsMnodeUpdateSize);
pOper->pObj = pMnode;
memcpy(pMnode, pRow->rowData, tsMnodeUpdateSize);
pRow->pObj = pMnode;
return TSDB_CODE_SUCCESS;
}
......@@ -137,23 +137,23 @@ int32_t mnodeInitMnodes() {
SMnodeObj tObj;
tsMnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_MNODE,
.tableName = "mnodes",
SSdbTableDesc desc = {
.id = SDB_TABLE_MNODE,
.name = "mnodes",
.hashSessions = TSDB_DEFAULT_MNODES_HASH_SIZE,
.maxRowSize = tsMnodeUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_INT,
.insertFp = mnodeMnodeActionInsert,
.deleteFp = mnodeMnodeActionDelete,
.updateFp = mnodeMnodeActionUpdate,
.encodeFp = mnodeMnodeActionEncode,
.decodeFp = mnodeMnodeActionDecode,
.destroyFp = mnodeMnodeActionDestroy,
.restoredFp = mnodeMnodeActionRestored
.fpInsert = mnodeMnodeActionInsert,
.fpDelete = mnodeMnodeActionDelete,
.fpUpdate = mnodeMnodeActionUpdate,
.fpEncode = mnodeMnodeActionEncode,
.fpDecode = mnodeMnodeActionDecode,
.fpDestroy = mnodeMnodeActionDestroy,
.fpRestored = mnodeMnodeActionRestored
};
tsMnodeSdb = sdbOpenTable(&tableDesc);
tsMnodeSdb = sdbOpenTable(&desc);
if (tsMnodeSdb == NULL) {
mError("failed to init mnodes data");
return -1;
......@@ -192,10 +192,6 @@ void *mnodeGetNextMnode(void *pIter, SMnodeObj **pMnode) {
return sdbFetchRow(tsMnodeSdb, pIter, (void **)pMnode);
}
char *mnodeGetMnodeRoleStr(int32_t role) {
return syncRole[role];
}
void mnodeUpdateMnodeEpSet() {
mInfo("update mnodes epSet, numOfEps:%d ", mnodeGetMnodesNum());
......@@ -329,11 +325,11 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
pMnode->mnodeId = dnodeId;
pMnode->createdTime = taosGetTimestampMs();
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb,
.pObj = pMnode,
.writeCb = mnodeCreateMnodeCb
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsMnodeSdb,
.pObj = pMnode,
.fpRsp = mnodeCreateMnodeCb
};
int32_t code = TSDB_CODE_SUCCESS;
......@@ -346,7 +342,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
return;
}
code = sdbInsertRow(&oper);
code = sdbInsertRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code));
tfree(pMnode);
......@@ -356,8 +352,8 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
void mnodeDropMnodeLocal(int32_t dnodeId) {
SMnodeObj *pMnode = mnodeGetMnode(dnodeId);
if (pMnode != NULL) {
SSdbOper oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode};
sdbDeleteRow(&oper);
SSdbRow row = {.type = SDB_OPER_LOCAL, .pTable = tsMnodeSdb, .pObj = pMnode};
sdbDeleteRow(&row);
mnodeDecMnodeRef(pMnode);
}
......@@ -371,13 +367,13 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
return TSDB_CODE_MND_DNODE_NOT_EXIST;
}
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb,
.pObj = pMnode
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsMnodeSdb,
.pObj = pMnode
};
int32_t code = sdbDeleteRow(&oper);
int32_t code = sdbDeleteRow(&row);
sdbDecRef(tsMnodeSdb, pMnode);
......@@ -469,7 +465,7 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* roles = mnodeGetMnodeRoleStr(pMnode->role);
char* roles = syncRole[pMnode->role];
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, roles, pShow->bytes[cols]);
cols++;
......
此差异已折叠。
此差异已折叠。
......@@ -42,13 +42,13 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg);
static int32_t mnodeUserActionDestroy(SSdbOper *pOper) {
tfree(pOper->pObj);
static int32_t mnodeUserActionDestroy(SSdbRow *pRow) {
tfree(pRow->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeUserActionInsert(SSdbOper *pOper) {
SUserObj *pUser = pOper->pObj;
static int32_t mnodeUserActionInsert(SSdbRow *pRow) {
SUserObj *pUser = pRow->pObj;
SAcctObj *pAcct = mnodeGetAcct(pUser->acct);
if (pAcct != NULL) {
......@@ -62,8 +62,8 @@ static int32_t mnodeUserActionInsert(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeUserActionDelete(SSdbOper *pOper) {
SUserObj *pUser = pOper->pObj;
static int32_t mnodeUserActionDelete(SSdbRow *pRow) {
SUserObj *pUser = pRow->pObj;
SAcctObj *pAcct = mnodeGetAcct(pUser->acct);
if (pAcct != NULL) {
......@@ -74,8 +74,8 @@ static int32_t mnodeUserActionDelete(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeUserActionUpdate(SSdbOper *pOper) {
SUserObj *pUser = pOper->pObj;
static int32_t mnodeUserActionUpdate(SSdbRow *pRow) {
SUserObj *pUser = pRow->pObj;
SUserObj *pSaved = mnodeGetUser(pUser->user);
if (pUser != pSaved) {
memcpy(pSaved, pUser, tsUserUpdateSize);
......@@ -85,19 +85,19 @@ static int32_t mnodeUserActionUpdate(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeUserActionEncode(SSdbOper *pOper) {
SUserObj *pUser = pOper->pObj;
memcpy(pOper->rowData, pUser, tsUserUpdateSize);
pOper->rowSize = tsUserUpdateSize;
static int32_t mnodeUserActionEncode(SSdbRow *pRow) {
SUserObj *pUser = pRow->pObj;
memcpy(pRow->rowData, pUser, tsUserUpdateSize);
pRow->rowSize = tsUserUpdateSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeUserActionDecode(SSdbOper *pOper) {
static int32_t mnodeUserActionDecode(SSdbRow *pRow) {
SUserObj *pUser = (SUserObj *)calloc(1, sizeof(SUserObj));
if (pUser == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pUser, pOper->rowData, tsUserUpdateSize);
pOper->pObj = pUser;
memcpy(pUser, pRow->rowData, tsUserUpdateSize);
pRow->pObj = pUser;
return TSDB_CODE_SUCCESS;
}
......@@ -150,25 +150,25 @@ int32_t mnodeInitUsers() {
SUserObj tObj;
tsUserUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_USER,
.tableName = "users",
SSdbTableDesc desc = {
.id = SDB_TABLE_USER,
.name = "users",
.hashSessions = TSDB_DEFAULT_USERS_HASH_SIZE,
.maxRowSize = tsUserUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_STRING,
.insertFp = mnodeUserActionInsert,
.deleteFp = mnodeUserActionDelete,
.updateFp = mnodeUserActionUpdate,
.encodeFp = mnodeUserActionEncode,
.decodeFp = mnodeUserActionDecode,
.destroyFp = mnodeUserActionDestroy,
.restoredFp = mnodeUserActionRestored
.fpInsert = mnodeUserActionInsert,
.fpDelete = mnodeUserActionDelete,
.fpUpdate = mnodeUserActionUpdate,
.fpEncode = mnodeUserActionEncode,
.fpDecode = mnodeUserActionDecode,
.fpDestroy = mnodeUserActionDestroy,
.fpRestored = mnodeUserActionRestored
};
tsUserSdb = sdbOpenTable(&tableDesc);
tsUserSdb = sdbOpenTable(&desc);
if (tsUserSdb == NULL) {
mError("table:%s, failed to create hash", tableDesc.tableName);
mError("table:%s, failed to create hash", desc.name);
return -1;
}
......@@ -179,7 +179,7 @@ int32_t mnodeInitUsers() {
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mnodeRetrieveUsers);
mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_AUTH, mnodeProcessAuthMsg);
mDebug("table:%s, hash is created", tableDesc.tableName);
mDebug("table:%s, hash is created", desc.name);
return 0;
}
......@@ -205,14 +205,14 @@ void mnodeDecUserRef(SUserObj *pUser) {
}
static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsUserSdb,
.pObj = pUser,
.pMsg = pMsg
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsUserSdb,
.pObj = pUser,
.pMsg = pMsg
};
int32_t code = sdbUpdateRow(&oper);
int32_t code = sdbUpdateRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} else {
......@@ -259,15 +259,15 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
pUser->superAuth = 1;
}
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsUserSdb,
.pObj = pUser,
.rowSize = sizeof(SUserObj),
.pMsg = pMsg
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsUserSdb,
.pObj = pUser,
.rowSize = sizeof(SUserObj),
.pMsg = pMsg
};
code = sdbInsertRow(&oper);
code = sdbInsertRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
tfree(pUser);
......@@ -279,14 +279,14 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
}
static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsUserSdb,
.pObj = pUser,
.pMsg = pMsg
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsUserSdb,
.pObj = pUser,
.pMsg = pMsg
};
int32_t code = sdbDeleteRow(&oper);
int32_t code = sdbDeleteRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} else {
......@@ -562,12 +562,12 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
if (pUser == NULL) break;
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
SSdbOper oper = {
.type = SDB_OPER_LOCAL,
.table = tsUserSdb,
.pObj = pUser,
SSdbRow row = {
.type = SDB_OPER_LOCAL,
.pTable = tsUserSdb,
.pObj = pUser,
};
sdbDeleteRow(&oper);
sdbDeleteRow(&row);
numOfUsers++;
}
......
......@@ -72,13 +72,13 @@ static void mnodeDestroyVgroup(SVgObj *pVgroup) {
tfree(pVgroup);
}
static int32_t mnodeVgroupActionDestroy(SSdbOper *pOper) {
mnodeDestroyVgroup(pOper->pObj);
static int32_t mnodeVgroupActionDestroy(SSdbRow *pRow) {
mnodeDestroyVgroup(pRow->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) {
SVgObj *pVgroup = pOper->pObj;
static int32_t mnodeVgroupActionInsert(SSdbRow *pRow) {
SVgObj *pVgroup = pRow->pObj;
// refer to db
SDbObj *pDb = mnodeGetDb(pVgroup->dbName);
......@@ -115,8 +115,8 @@ static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeVgroupActionDelete(SSdbOper *pOper) {
SVgObj *pVgroup = pOper->pObj;
static int32_t mnodeVgroupActionDelete(SSdbRow *pRow) {
SVgObj *pVgroup = pRow->pObj;
if (pVgroup->pDb == NULL) {
mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName);
......@@ -137,8 +137,8 @@ static int32_t mnodeVgroupActionDelete(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
SVgObj *pNew = pOper->pObj;
static int32_t mnodeVgroupActionUpdate(SSdbRow *pRow) {
SVgObj *pNew = pRow->pObj;
SVgObj *pVgroup = mnodeGetVgroup(pNew->vgId);
if (pVgroup != pNew) {
......@@ -176,25 +176,25 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeVgroupActionEncode(SSdbOper *pOper) {
SVgObj *pVgroup = pOper->pObj;
memcpy(pOper->rowData, pVgroup, tsVgUpdateSize);
SVgObj *pTmpVgroup = pOper->rowData;
static int32_t mnodeVgroupActionEncode(SSdbRow *pRow) {
SVgObj *pVgroup = pRow->pObj;
memcpy(pRow->rowData, pVgroup, tsVgUpdateSize);
SVgObj *pTmpVgroup = pRow->rowData;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
pTmpVgroup->vnodeGid[i].pDnode = NULL;
pTmpVgroup->vnodeGid[i].role = 0;
}
pOper->rowSize = tsVgUpdateSize;
pRow->rowSize = tsVgUpdateSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeVgroupActionDecode(SSdbOper *pOper) {
static int32_t mnodeVgroupActionDecode(SSdbRow *pRow) {
SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj));
if (pVgroup == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pVgroup, pOper->rowData, tsVgUpdateSize);
pOper->pObj = pVgroup;
memcpy(pVgroup, pRow->rowData, tsVgUpdateSize);
pRow->pObj = pVgroup;
return TSDB_CODE_SUCCESS;
}
......@@ -206,23 +206,23 @@ int32_t mnodeInitVgroups() {
SVgObj tObj;
tsVgUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_VGROUP,
.tableName = "vgroups",
SSdbTableDesc desc = {
.id = SDB_TABLE_VGROUP,
.name = "vgroups",
.hashSessions = TSDB_DEFAULT_VGROUPS_HASH_SIZE,
.maxRowSize = tsVgUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_AUTO,
.insertFp = mnodeVgroupActionInsert,
.deleteFp = mnodeVgroupActionDelete,
.updateFp = mnodeVgroupActionUpdate,
.encodeFp = mnodeVgroupActionEncode,
.decodeFp = mnodeVgroupActionDecode,
.destroyFp = mnodeVgroupActionDestroy,
.restoredFp = mnodeVgroupActionRestored,
.fpInsert = mnodeVgroupActionInsert,
.fpDelete = mnodeVgroupActionDelete,
.fpUpdate = mnodeVgroupActionUpdate,
.fpEncode = mnodeVgroupActionEncode,
.fpDecode = mnodeVgroupActionDecode,
.fpDestroy = mnodeVgroupActionDestroy,
.fpRestored = mnodeVgroupActionRestored,
};
tsVgroupSdb = sdbOpenTable(&tableDesc);
tsVgroupSdb = sdbOpenTable(&desc);
if (tsVgroupSdb == NULL) {
mError("failed to init vgroups data");
return -1;
......@@ -253,13 +253,13 @@ SVgObj *mnodeGetVgroup(int32_t vgId) {
}
void mnodeUpdateVgroup(SVgObj *pVgroup) {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb,
.pObj = pVgroup
};
int32_t code = sdbUpdateRow(&oper);
int32_t code = sdbUpdateRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("vgId:%d, failed to update vgroup", pVgroup->vgId);
}
......@@ -519,14 +519,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
sdbDeleteRow(&desc);
return code;
} else {
mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes);
pVgroup->status = TAOS_VG_STATUS_READY;
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
(void)sdbUpdateRow(&desc);
dnodeReprocessMWriteMsg(pMsg);
......@@ -535,7 +535,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
// mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
// pDb->name, pVgroup->numOfVnodes);
// pVgroup->status = TAOS_VG_STATUS_READY;
// SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
// SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
// (void)sdbUpdateRow(&desc);
// dnodeReprocessMWriteMsg(pMsg);
// return TSDB_CODE_MND_ACTION_IN_PROGRESS;
......@@ -571,16 +571,16 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
pMsg->pVgroup = pVgroup;
mnodeIncVgroupRef(pVgroup);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = sizeof(SVgObj),
.pMsg = pMsg,
.reqFp = mnodeCreateVgroupFp
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = sizeof(SVgObj),
.pMsg = pMsg,
.fpReq = mnodeCreateVgroupFp
};
code = sdbInsertRow(&oper);
code = sdbInsertRow(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
pMsg->pVgroup = NULL;
mnodeDestroyVgroup(pVgroup);
......@@ -595,12 +595,12 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
} else {
mDebug("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
mnodeSendDropVgroupMsg(pVgroup, NULL);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb,
.pObj = pVgroup
};
sdbDeleteRow(&oper);
sdbDeleteRow(&row);
}
}
......@@ -770,7 +770,7 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
SDnodeObj * pDnode = pVgroup->vnodeGid[i].pDnode;
const char *role = "NULL";
if (pDnode != NULL) {
role = mnodeGetMnodeRoleStr(pVgroup->vnodeGid[i].role);
role = syncRole[pVgroup->vnodeGid[i].role];
}
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -957,28 +957,28 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (mnodeMsg->received != mnodeMsg->expected) return;
if (mnodeMsg->received == mnodeMsg->successed) {
SSdbOper oper = {
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pTable = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = sizeof(SVgObj),
.pMsg = mnodeMsg,
.writeCb = mnodeCreateVgroupCb
.fpRsp = mnodeCreateVgroupCb
};
int32_t code = sdbInsertRowImp(&oper);
int32_t code = sdbInsertRowToQueue(&row);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeMsg->pVgroup = NULL;
mnodeDestroyVgroup(pVgroup);
dnodeSendRpcMWriteRsp(mnodeMsg, code);
}
} else {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb,
.pObj = pVgroup
};
sdbDeleteRow(&oper);
sdbDeleteRow(&row);
dnodeSendRpcMWriteRsp(mnodeMsg, mnodeMsg->code);
}
}
......@@ -1031,12 +1031,12 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
if (mnodeMsg->received != mnodeMsg->expected) return;
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb,
.pObj = pVgroup
};
int32_t code = sdbDeleteRow(&oper);
int32_t code = sdbDeleteRow(&row);
if (code != 0) {
code = TSDB_CODE_MND_SDB_ERROR;
}
......@@ -1084,12 +1084,12 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) {
mnodeDropAllChildTablesInVgroups(pVgroup);
SSdbOper oper = {
.type = SDB_OPER_LOCAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
SSdbRow row = {
.type = SDB_OPER_LOCAL,
.pTable = tsVgroupSdb,
.pObj = pVgroup,
};
sdbDeleteRow(&oper);
sdbDeleteRow(&row);
numOfVgroups++;
}
mnodeDecVgroupRef(pVgroup);
......@@ -1135,12 +1135,12 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDropDb) {
SSdbOper oper = {
.type = SDB_OPER_LOCAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
SSdbRow row = {
.type = SDB_OPER_LOCAL,
.pTable = tsVgroupSdb,
.pObj = pVgroup,
};
sdbDeleteRow(&oper);
sdbDeleteRow(&row);
numOfVgroups++;
}
......
......@@ -118,7 +118,7 @@ typedef struct {
typedef struct {
char *module;
bool (*decodeFp)(struct HttpContext *pContext);
bool (*fpDecode)(struct HttpContext *pContext);
} HttpDecodeMethod;
typedef struct {
......
......@@ -21,11 +21,11 @@
#include "httpHandle.h"
bool httpDecodeRequest(HttpContext* pContext) {
if (pContext->decodeMethod->decodeFp == NULL) {
if (pContext->decodeMethod->fpDecode == NULL) {
return false;
}
return (*pContext->decodeMethod->decodeFp)(pContext);
return (*pContext->decodeMethod->fpDecode)(pContext);
}
/**
......
......@@ -27,16 +27,20 @@ extern "C" {
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define TAOS_SMSG_SYNC_DATA 1
#define TAOS_SMSG_FORWARD 2
#define TAOS_SMSG_FORWARD_RSP 3
#define TAOS_SMSG_SYNC_REQ 4
#define TAOS_SMSG_SYNC_RSP 5
#define TAOS_SMSG_SYNC_MUST 6
#define TAOS_SMSG_STATUS 7
typedef enum {
TAOS_SMSG_SYNC_DATA = 1,
TAOS_SMSG_FORWARD = 2,
TAOS_SMSG_FORWARD_RSP = 3,
TAOS_SMSG_SYNC_REQ = 4,
TAOS_SMSG_SYNC_RSP = 5,
TAOS_SMSG_SYNC_MUST = 6,
TAOS_SMSG_STATUS = 7
} ESyncMsgType;
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
......@@ -68,6 +72,9 @@ typedef struct {
typedef struct {
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[];
} SPeersStatus;
......@@ -120,12 +127,12 @@ typedef struct SsyncPeer {
int32_t nodeId;
uint32_t ip;
uint16_t port;
int8_t role;
int8_t sstatus; // sync status
char fqdn[TSDB_FQDN_LEN]; // peer ip string
char id[TSDB_EP_LEN + 32]; // peer vgId + end point
int8_t role;
int8_t sstatus; // sync status
uint64_t version;
uint64_t sversion; // track the peer version in retrieve process
uint64_t sversion; // track the peer version in retrieve process
int32_t syncFd;
int32_t peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried
......@@ -135,7 +142,7 @@ typedef struct SsyncPeer {
int32_t notifyFd;
int32_t watchNum;
int32_t *watchFd;
int8_t refCount; // reference count
int32_t refCount; // reference count
struct SSyncNode *pSyncNode;
} SSyncPeer;
......@@ -143,16 +150,16 @@ typedef struct SSyncNode {
char path[TSDB_FILENAME_LEN];
int8_t replica;
int8_t quorum;
int8_t selfIndex;
uint32_t vgId;
int64_t rid;
void *ahandle;
int8_t selfIndex;
SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator
SSyncPeer *pMaster;
int8_t refCount;
SRecvBuffer *pRecv;
SSyncFwds *pSyncFwds; // saved forward info if quorum >1
void *pFwdTimer;
void *pRoleTimer;
FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo;
FWriteToCache writeToCache;
......
此差异已折叠。
......@@ -140,6 +140,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
if (buffer == NULL) return -1;
SWalHead *pHead = (SWalHead *)buffer;
uint64_t lastVer = 0;
while (1) {
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
......@@ -153,7 +154,14 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
if (ret < 0) break;
sDebug("%s, restore a record, qtype:wal hver:%" PRIu64, pPeer->id, pHead->version);
sDebug("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version);
if (lastVer == pHead->version) {
sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer);
break;
}
lastVer = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL);
}
......@@ -214,7 +222,7 @@ int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
memcpy(pRecv->offset, pHead, len);
pRecv->offset += len;
pRecv->forwards++;
sDebug("%s, fwd is saved into queue, ver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards);
sDebug("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards);
} else {
sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize);
pRecv->code = -1; // set error code
......@@ -291,7 +299,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
}
void *syncRestoreData(void *param) {
SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE();
......@@ -300,7 +308,8 @@ void *syncRestoreData(void *param) {
(*pNode->notifyRole)(pNode->ahandle, TAOS_SYNC_ROLE_SYNCING);
if (syncOpenRecvBuffer(pNode) < 0) {
sError("%s, failed to allocate recv buffer", pPeer->id);
sError("%s, failed to allocate recv buffer, restart connection", pPeer->id);
syncRestartConnection(pPeer);
} else {
if (syncRestoreDataStepByStep(pPeer) == 0) {
sInfo("%s, it is synced successfully", pPeer->id);
......
......@@ -268,7 +268,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
break;
}
sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version);
sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if (ret != wsize) break;
pPeer->sversion = pHead->version;
......@@ -418,7 +418,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
}
if (code == 0) {
sDebug("%s, wal retrieve is finished", pPeer->id);
sInfo("%s, wal retrieve is finished", pPeer->id);
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
SWalHead walHead;
memset(&walHead, 0, sizeof(walHead));
......@@ -447,7 +447,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
pPeer->sversion = 0;
pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
sDebug("%s, start to retrieve file", pPeer->id);
sInfo("%s, start to retrieve file", pPeer->id);
if (syncRetrieveFile(pPeer) < 0) {
sError("%s, failed to retrieve file", pPeer->id);
return -1;
......@@ -456,7 +456,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
// if no files are synced, there must be wal to sync, sversion must be larger than one
if (pPeer->sversion == 0) pPeer->sversion = 1;
sDebug("%s, start to retrieve wal", pPeer->id);
sInfo("%s, start to retrieve wal", pPeer->id);
if (syncRetrieveWal(pPeer) < 0) {
sError("%s, failed to retrieve wal", pPeer->id);
return -1;
......@@ -478,7 +478,7 @@ void *syncRetrieveData(void *param) {
sInfo("%s, sync tcp is setup", pPeer->id);
if (syncRetrieveDataStepByStep(pPeer) == 0) {
sDebug("%s, sync retrieve process is successful", pPeer->id);
sInfo("%s, sync retrieve process is successful", pPeer->id);
} else {
sError("%s, failed to retrieve data, restart connection", pPeer->id);
syncRestartConnection(pPeer);
......
......@@ -150,7 +150,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
}
void taosFreeTcpConn(void *param) {
SConnObj * pConn = (SConnObj *)param;
SConnObj * pConn = param;
SThreadObj *pThread = pConn->pThread;
sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
......
......@@ -144,9 +144,9 @@ void walFsync(void *handle, bool forceFsync) {
if (pWal == NULL || pWal->fd < 0) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, file:%s, do fsync", pWal->vgId, pWal->name);
wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId);
if (fsync(pWal->fd) < 0) {
wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno));
wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno));
}
}
}
......
......@@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG
echo "mDebugFlag 143" >> $TAOS_CFG
echo "sdbDebugFlag 143" >> $TAOS_CFG
echo "dDebugFlag 143" >> $TAOS_CFG
echo "vDebugFlag 143" >> $TAOS_CFG
echo "tsdbDebugFlag 143" >> $TAOS_CFG
echo "cDebugFlag 143" >> $TAOS_CFG
echo "jnidebugFlag 143" >> $TAOS_CFG
echo "odbcdebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 143" >> $TAOS_CFG
echo "mqttDebugFlag 143" >> $TAOS_CFG
echo "qdebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "vDebugFlag 135" >> $TAOS_CFG
echo "tsdbDebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "mqttDebugFlag 135" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 143" >> $TAOS_CFG
echo "sdebugFlag 143" >> $TAOS_CFG
echo "wdebugFlag 143" >> $TAOS_CFG
echo "cqdebugFlag 143" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG
echo "wdebugFlag 135" >> $TAOS_CFG
echo "cqdebugFlag 135" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册