未验证 提交 090df131 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4771 from taosdata/feature/wal

[TD-2429]<fix>: data may be lost during load balancing
...@@ -224,19 +224,34 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { ...@@ -224,19 +224,34 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
return false; return false;
} }
int32_t rmVnodeVer = 0;
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVnode = pVgroup->vnodeGid + i;
if (pVnode == pRmVnode) {
rmVnodeVer = mnodeGetVgidVer(pVnode->vver);
mTrace("vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d is watching", pVgroup->vgId, i,
pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], rmVnodeVer);
}
}
bool isReady = false; bool isReady = false;
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVnode = pVgroup->vnodeGid + i; SVnodeGid *pVnode = pVgroup->vnodeGid + i;
if (pVnode == pRmVnode) continue; if (pVnode == pRmVnode) continue;
int32_t vver = mnodeGetVgidVer(pVnode->vver);
mTrace("vgId:%d, check vgroup status, dnode:%d status:%d, vnode role:%s", pVgroup->vgId, pVnode->pDnode->dnodeId, mTrace("vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d" , pVgroup->vgId, i,
pVnode->pDnode->status, syncRole[pVnode->role]); pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer);
if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue; if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue;
if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue; if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue;
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) continue;
if (pVnode->role == TAOS_SYNC_ROLE_SLAVE || pVnode->role == TAOS_SYNC_ROLE_MASTER) { if (rmVnodeVer == 0 || vver >= rmVnodeVer) {
isReady = true; mInfo("vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d", pVgroup->vgId, i,
pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer);
} }
isReady = true;
} }
return isReady; return isReady;
...@@ -256,7 +271,7 @@ static int32_t bnRemoveVnode(SVgObj *pVgroup) { ...@@ -256,7 +271,7 @@ static int32_t bnRemoveVnode(SVgObj *pVgroup) {
mDebug("vgId:%d, is not ready", pVgroup->vgId); mDebug("vgId:%d, is not ready", pVgroup->vgId);
return -1; return -1;
} else { } else {
mDebug("vgId:%d, is ready, discard dnode:%d", pVgroup->vgId, pSelVnode->dnodeId); mInfo("vgId:%d, is ready, discard dnode:%d", pVgroup->vgId, pSelVnode->dnodeId);
bnDiscardVnode(pVgroup, pSelVnode); bnDiscardVnode(pVgroup, pSelVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -2503,7 +2503,7 @@ bool tscSetSqlOwner(SSqlObj* pSql) { ...@@ -2503,7 +2503,7 @@ bool tscSetSqlOwner(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
// set the sql object owner // set the sql object owner
uint64_t threadId = taosGetPthreadId(); uint64_t threadId = taosGetSelfPthreadId();
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) { if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
pRes->code = TSDB_CODE_QRY_IN_EXEC; pRes->code = TSDB_CODE_QRY_IN_EXEC;
return false; return false;
......
...@@ -143,7 +143,7 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -143,7 +143,7 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod); pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { for (int32_t j = 0; j < pCreate->cfg.vgReplica; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
} }
......
...@@ -209,9 +209,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected ...@@ -209,9 +209,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Database memory is full for commit failed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Database memory is full for commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, 0, 0x050C, "Database memory is full for waiting commit") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, 0, 0x050C, "Database memory is full for waiting commit")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, 0, 0x050D, "Database is dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_BALANCING, 0, 0x050E, "Database is balancing")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, 0, 0x0513, "Database is syncing")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID")
......
...@@ -518,16 +518,17 @@ typedef struct SRetrieveTableRsp { ...@@ -518,16 +518,17 @@ typedef struct SRetrieveTableRsp {
} SRetrieveTableRsp; } SRetrieveTableRsp;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t dbCfgVersion; int32_t dbCfgVersion;
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
uint8_t status; uint64_t vnodeVersion;
uint8_t role; int32_t vgCfgVersion;
uint8_t replica; uint8_t status;
uint8_t reserved; uint8_t role;
int32_t vgCfgVersion; uint8_t replica;
uint8_t reserved;
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
...@@ -663,13 +664,14 @@ typedef struct { ...@@ -663,13 +664,14 @@ typedef struct {
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t walLevel; int8_t walLevel;
int8_t replications; int8_t vgReplica;
int8_t wals; int8_t wals;
int8_t quorum; int8_t quorum;
int8_t update; int8_t update;
int8_t cacheLastRow; int8_t cacheLastRow;
int32_t vgCfgVersion; int32_t vgCfgVersion;
int8_t reserved[10]; int8_t dbReplica;
int8_t reserved[9];
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {
......
...@@ -128,8 +128,8 @@ typedef struct { ...@@ -128,8 +128,8 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int8_t role; int8_t role;
int8_t reserved[3]; int8_t vver[3]; // To ensure compatibility, 3 bits are used to represent the remainder of 64 bit version
SDnodeObj* pDnode; SDnodeObj *pDnode;
} SVnodeGid; } SVnodeGid;
typedef struct SVgObj { typedef struct SVgObj {
......
...@@ -53,6 +53,9 @@ void mnodeSendAlterVgroupMsg(SVgObj *pVgroup); ...@@ -53,6 +53,9 @@ void mnodeSendAlterVgroupMsg(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup); SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromIp(char *ep); SRpcEpSet mnodeGetEpSetFromIp(char *ep);
int32_t mnodeGetVgidVer(int8_t *vver);
void mnodeSetVgidVer(int8_t *cver, uint64_t iver);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -571,6 +571,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { ...@@ -571,6 +571,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
pVload->vgId = htonl(pVload->vgId); pVload->vgId = htonl(pVload->vgId);
pVload->dbCfgVersion = htonl(pVload->dbCfgVersion); pVload->dbCfgVersion = htonl(pVload->dbCfgVersion);
pVload->vgCfgVersion = htonl(pVload->vgCfgVersion); pVload->vgCfgVersion = htonl(pVload->vgCfgVersion);
pVload->vnodeVersion = htobe64(pVload->vnodeVersion);
SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId); SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
......
...@@ -184,6 +184,7 @@ static int32_t mnodeVgroupActionEncode(SSdbRow *pRow) { ...@@ -184,6 +184,7 @@ static int32_t mnodeVgroupActionEncode(SSdbRow *pRow) {
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
pTmpVgroup->vnodeGid[i].pDnode = NULL; pTmpVgroup->vnodeGid[i].pDnode = NULL;
pTmpVgroup->vnodeGid[i].role = 0; pTmpVgroup->vnodeGid[i].role = 0;
memset(pTmpVgroup->vnodeGid[i].vver, 0, sizeof(pTmpVgroup->vnodeGid[i].vver));
} }
pRow->rowSize = tsVgUpdateSize; pRow->rowSize = tsVgUpdateSize;
...@@ -317,9 +318,10 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl ...@@ -317,9 +318,10 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->pDnode == pDnode) { if (pVgid->pDnode == pDnode) {
mTrace("dnode:%d, receive status from dnode, vgId:%d status:%s last:%s", pDnode->dnodeId, pVgroup->vgId, mTrace("vgId:%d, receive vnode status from dnode:%d, status:%s last:%s vver:%" PRIu64, pVgroup->vgId,
syncRole[pVload->role], syncRole[pVgid->role]); pDnode->dnodeId, syncRole[pVload->role], syncRole[pVgid->role], pVload->vnodeVersion);
pVgid->role = pVload->role; pVgid->role = pVload->role;
mnodeSetVgidVer(pVgid->vver, pVload->vnodeVersion);
if (pVload->role == TAOS_SYNC_ROLE_MASTER) { if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
pVgroup->inUse = i; pVgroup->inUse = i;
} }
...@@ -859,11 +861,12 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { ...@@ -859,11 +861,12 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
pCfg->precision = pDb->cfg.precision; pCfg->precision = pDb->cfg.precision;
pCfg->compression = pDb->cfg.compression; pCfg->compression = pDb->cfg.compression;
pCfg->walLevel = pDb->cfg.walLevel; pCfg->walLevel = pDb->cfg.walLevel;
pCfg->replications = (int8_t) pVgroup->numOfVnodes; pCfg->vgReplica = (int8_t) pVgroup->numOfVnodes;
pCfg->wals = 3; pCfg->wals = 3;
pCfg->quorum = pDb->cfg.quorum; pCfg->quorum = pDb->cfg.quorum;
pCfg->update = pDb->cfg.update; pCfg->update = pDb->cfg.update;
pCfg->cacheLastRow = pDb->cfg.cacheLastRow; pCfg->cacheLastRow = pDb->cfg.cacheLastRow;
pCfg->dbReplica = pDb->cfg.replications;
SVnodeDesc *pNodes = pVnode->nodes; SVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
...@@ -1179,3 +1182,14 @@ void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb) { ...@@ -1179,3 +1182,14 @@ void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb) {
mInfo("db:%s, all vgroups:%d drop msg is sent to dnode", pDropDb->name, numOfVgroups); mInfo("db:%s, all vgroups:%d drop msg is sent to dnode", pDropDb->name, numOfVgroups);
} }
int32_t mnodeGetVgidVer(int8_t *cver) {
int32_t iver = ((int32_t)cver[0]) * 10000 + ((int32_t)cver[1]) * 100 + (int32_t)cver[2];
return iver;
}
void mnodeSetVgidVer(int8_t *cver, uint64_t iver) {
cver[0] = (int8_t)((int32_t)(iver % 1000000) / 10000);
cver[1] = (int8_t)((int32_t)(iver % 100000) / 100);
cver[2] = (int8_t)(iver % 100);
}
\ No newline at end of file
...@@ -29,12 +29,13 @@ extern "C" { ...@@ -29,12 +29,13 @@ extern "C" {
#endif #endif
// TAOS_OS_FUNC_SEMPHONE_PTHREAD // TAOS_OS_FUNC_SEMPHONE_PTHREAD
bool taosCheckPthreadValid(pthread_t thread); bool taosCheckPthreadValid(pthread_t thread);
int64_t taosGetPthreadId(); int64_t taosGetSelfPthreadId();
void taosResetPthread(pthread_t *thread); int64_t taosGetPthreadId(pthread_t thread);
bool taosComparePthread(pthread_t first, pthread_t second); void taosResetPthread(pthread_t* thread);
bool taosComparePthread(pthread_t first, pthread_t second);
int32_t taosGetPId(); int32_t taosGetPId();
int32_t taosGetCurrentAPPName(char *name, int32_t* len); int32_t taosGetCurrentAPPName(char* name, int32_t* len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -31,7 +31,8 @@ int tsem_wait(tsem_t* sem) { ...@@ -31,7 +31,8 @@ int tsem_wait(tsem_t* sem) {
#ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD #ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD
bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
int64_t taosGetPthreadId() { return (int64_t)pthread_self(); } int64_t taosGetSelfPthreadId() { return (int64_t)pthread_self(); }
int64_t taosGetPthreadId(pthread_t thread) { return (int64_t)thread; }
void taosResetPthread(pthread_t *thread) { *thread = 0; } void taosResetPthread(pthread_t *thread) { *thread = 0; }
bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; } bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; }
int32_t taosGetPId() { return getpid(); } int32_t taosGetPId() { return getpid(); }
......
...@@ -25,14 +25,16 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread.p != NULL; } ...@@ -25,14 +25,16 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread.p != NULL; }
void taosResetPthread(pthread_t *thread) { thread->p = 0; } void taosResetPthread(pthread_t *thread) { thread->p = 0; }
int64_t taosGetPthreadId() { int64_t taosGetPthreadId(pthread_t thread) {
#ifdef PTW32_VERSION #ifdef PTW32_VERSION
return pthread_getw32threadid_np(pthread_self()); return pthread_getw32threadid_np(thread);
#else #else
return (int64_t)pthread_self(); return (int64_t)thread;
#endif #endif
} }
int64_t taosGetSelfPthreadId() { return taosGetPthreadId(pthread_self()); }
bool taosComparePthread(pthread_t first, pthread_t second) { bool taosComparePthread(pthread_t first, pthread_t second) {
return first.p == second.p; return first.p == second.p;
} }
......
...@@ -7250,7 +7250,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -7250,7 +7250,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is // clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
// put into task to be executed. // put into task to be executed.
assert(pQInfo->owner == taosGetPthreadId()); assert(pQInfo->owner == taosGetSelfPthreadId());
pQInfo->owner = 0; pQInfo->owner = 0;
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
...@@ -7263,7 +7263,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) { ...@@ -7263,7 +7263,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
bool qTableQuery(qinfo_t qinfo) { bool qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo); assert(pQInfo && pQInfo->signature == pQInfo);
int64_t threadId = taosGetPthreadId(); int64_t threadId = taosGetSelfPthreadId();
int64_t curOwner = 0; int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) { if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
......
...@@ -272,7 +272,7 @@ static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) ...@@ -272,7 +272,7 @@ static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType)
} }
static void rpcLockCache(int64_t *lockedBy) { static void rpcLockCache(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetSelfPthreadId();
int i = 0; int i = 0;
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) { while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
if (++i % 100 == 0) { if (++i % 100 == 0) {
...@@ -282,7 +282,7 @@ static void rpcLockCache(int64_t *lockedBy) { ...@@ -282,7 +282,7 @@ static void rpcLockCache(int64_t *lockedBy) {
} }
static void rpcUnlockCache(int64_t *lockedBy) { static void rpcUnlockCache(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetSelfPthreadId();
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) { if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
assert(false); assert(false);
} }
......
...@@ -1604,7 +1604,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1604,7 +1604,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
} }
static void rpcLockConn(SRpcConn *pConn) { static void rpcLockConn(SRpcConn *pConn) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetSelfPthreadId();
int i = 0; int i = 0;
while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) { while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
if (++i % 1000 == 0) { if (++i % 1000 == 0) {
...@@ -1614,7 +1614,7 @@ static void rpcLockConn(SRpcConn *pConn) { ...@@ -1614,7 +1614,7 @@ static void rpcLockConn(SRpcConn *pConn) {
} }
static void rpcUnlockConn(SRpcConn *pConn) { static void rpcUnlockConn(SRpcConn *pConn) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetSelfPthreadId();
if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) { if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
assert(false); assert(false);
} }
......
...@@ -478,9 +478,7 @@ static void syncAddArbitrator(SSyncNode *pNode) { ...@@ -478,9 +478,7 @@ static void syncAddArbitrator(SSyncNode *pNode) {
static void syncFreeNode(void *param) { static void syncFreeNode(void *param) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
sDebug("vgId:%d, node is freed, refCount:%d", pNode->vgId, pNode->refCount);
int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1);
sDebug("vgId:%d, syncnode is freed, refCount:%d", pNode->vgId, refCount);
pthread_mutex_destroy(&pNode->mutex); pthread_mutex_destroy(&pNode->mutex);
tfree(pNode->pRecv); tfree(pNode->pRecv);
...@@ -491,10 +489,10 @@ static void syncFreeNode(void *param) { ...@@ -491,10 +489,10 @@ static void syncFreeNode(void *param) {
SSyncNode *syncAcquireNode(int64_t rid) { SSyncNode *syncAcquireNode(int64_t rid) {
SSyncNode *pNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode *pNode = taosAcquireRef(tsNodeRefId, rid);
if (pNode == NULL) { if (pNode == NULL) {
sDebug("failed to acquire syncnode from refId:%" PRId64, rid); sDebug("failed to acquire node from refId:%" PRId64, rid);
} else { } else {
int32_t refCount = atomic_add_fetch_32(&pNode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pNode->refCount, 1);
sTrace("vgId:%d, acquire syncnode refId:%" PRId64 ", refCount:%d", pNode->vgId, rid, refCount); sTrace("vgId:%d, acquire node refId:%" PRId64 ", refCount:%d", pNode->vgId, rid, refCount);
} }
return pNode; return pNode;
...@@ -502,16 +500,14 @@ SSyncNode *syncAcquireNode(int64_t rid) { ...@@ -502,16 +500,14 @@ SSyncNode *syncAcquireNode(int64_t rid) {
void syncReleaseNode(SSyncNode *pNode) { void syncReleaseNode(SSyncNode *pNode) {
int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1);
sTrace("vgId:%d, dec syncnode refId:%" PRId64 " refCount:%d", pNode->vgId, pNode->rid, refCount); sTrace("vgId:%d, release node refId:%" PRId64 ", refCount:%d", pNode->vgId, pNode->rid, refCount);
taosReleaseRef(tsNodeRefId, pNode->rid); taosReleaseRef(tsNodeRefId, pNode->rid);
} }
static void syncFreePeer(void *param) { static void syncFreePeer(void *param) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
sDebug("%s, peer is freed, refCount:%d", pPeer->id, pPeer->refCount);
int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1);
sDebug("%s, peer is freed, refCount:%d", pPeer->id, refCount);
syncReleaseNode(pPeer->pSyncNode); syncReleaseNode(pPeer->pSyncNode);
tfree(pPeer); tfree(pPeer);
...@@ -531,7 +527,7 @@ SSyncPeer *syncAcquirePeer(int64_t rid) { ...@@ -531,7 +527,7 @@ SSyncPeer *syncAcquirePeer(int64_t rid) {
void syncReleasePeer(SSyncPeer *pPeer) { void syncReleasePeer(SSyncPeer *pPeer) {
int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1);
sTrace("%s, dec peer refId:%" PRId64 ", refCount:%d", pPeer->id, pPeer->rid, refCount); sTrace("%s, release peer refId:%" PRId64 ", refCount:%d", pPeer->id, pPeer->rid, refCount);
taosReleaseRef(tsPeerRefId, pPeer->rid); taosReleaseRef(tsPeerRefId, pPeer->rid);
} }
...@@ -879,14 +875,14 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { ...@@ -879,14 +875,14 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, (void *)pPeer->rid); int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, (void *)pPeer->rid);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret < 0) {
sError("%s, failed to create sync thread since %s", pPeer->id, strerror(errno)); sError("%s, failed to create sync retrieve thread since %s", pPeer->id, strerror(errno));
syncReleasePeer(pPeer);
} else { } else {
pPeer->sstatus = TAOS_SYNC_STATUS_START; pPeer->sstatus = TAOS_SYNC_STATUS_START;
sDebug("%s, thread is created to retrieve data, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sDebug("%s, sync retrieve thread:0x%08" PRIx64 " create successfully, rid:%" PRId64 ", set sstatus:%s", pPeer->id,
taosGetPthreadId(thread), pPeer->rid, syncStatus[pPeer->sstatus]);
} }
syncReleasePeer(pPeer);
} }
static void syncNotStarted(void *param, void *tmrId) { static void syncNotStarted(void *param, void *tmrId) {
...@@ -1154,19 +1150,19 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { ...@@ -1154,19 +1150,19 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
(void)syncAcquirePeer(pPeer->rid); (void)syncAcquirePeer(pPeer->rid);
int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, (void *)pPeer->rid); int32_t ret = pthread_create(&thread, &thattr, (void *)syncRestoreData, (void *)pPeer->rid);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret < 0) { if (ret < 0) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
nodeSStatus = TAOS_SYNC_STATUS_INIT; nodeSStatus = TAOS_SYNC_STATUS_INIT;
sError("%s, failed to create sync thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sError("%s, failed to create sync restore thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
syncReleasePeer(pPeer);
} else { } else {
sInfo("%s, sync connection is up", pPeer->id); sInfo("%s, sync restore thread:0x%08" PRIx64 " create successfully, rid:%" PRId64, pPeer->id,
taosGetPthreadId(thread), pPeer->rid);
} }
syncReleasePeer(pPeer);
} }
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
......
...@@ -353,12 +353,16 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -353,12 +353,16 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
void *syncRestoreData(void *param) { void *syncRestoreData(void *param) {
int64_t rid = (int64_t)param; int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid); SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return NULL; if (pPeer == NULL) {
sError("failed to restore data, invalid peer rid:%" PRId64, rid);
return NULL;
}
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
__sync_fetch_and_add(&tsSyncNum, 1); __sync_fetch_and_add(&tsSyncNum, 1);
sInfo("%s, start to restore data, sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
(*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING); (*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
...@@ -380,11 +384,14 @@ void *syncRestoreData(void *param) { ...@@ -380,11 +384,14 @@ void *syncRestoreData(void *param) {
(*pNode->notifyRole)(pNode->vgId, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
nodeSStatus = TAOS_SYNC_STATUS_INIT; nodeSStatus = TAOS_SYNC_STATUS_INIT;
sInfo("%s, sync over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, restore data over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
__sync_fetch_and_sub(&tsSyncNum, 1); __sync_fetch_and_sub(&tsSyncNum, 1);
// The ref is obtained in both the create thread and the current thread, so it is released twice
syncReleasePeer(pPeer);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
return NULL; return NULL;
......
...@@ -194,7 +194,7 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { ...@@ -194,7 +194,7 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
} }
if (ret == 0) { if (ret == 0) {
sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret); sDebug("sfd:%d, read to the end of file, ret:%d", sfd, ret);
return 0; return 0;
} }
...@@ -253,7 +253,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi ...@@ -253,7 +253,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
break; break;
} }
sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
int32_t wsize = code; int32_t wsize = code;
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
...@@ -466,10 +466,15 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -466,10 +466,15 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
void *syncRetrieveData(void *param) { void *syncRetrieveData(void *param) {
int64_t rid = (int64_t)param; int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid); SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return NULL; if (pPeer == NULL) {
sError("failed to retrieve data, invalid peer rid:%" PRId64, rid);
return NULL;
}
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
sInfo("%s, start to retrieve data, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves); if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
...@@ -496,7 +501,11 @@ void *syncRetrieveData(void *param) { ...@@ -496,7 +501,11 @@ void *syncRetrieveData(void *param) {
pPeer->fileChanged = 0; pPeer->fileChanged = 0;
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
// The ref is obtained in both the create thread and the current thread, so it is released twice
syncReleasePeer(pPeer);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
sInfo("%s, sync retrieve data over, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
return NULL; return NULL;
} }
...@@ -364,7 +364,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { ...@@ -364,7 +364,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
ptm = localtime_r(&curTime, &Tm); ptm = localtime_r(&curTime, &Tm);
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetPthreadId()); ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId());
len += sprintf(buffer + len, "%s", flags); len += sprintf(buffer + len, "%s", flags);
va_start(argpointer, format); va_start(argpointer, format);
...@@ -450,7 +450,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . ...@@ -450,7 +450,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
ptm = localtime_r(&curTime, &Tm); ptm = localtime_r(&curTime, &Tm);
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetPthreadId()); ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId());
len += sprintf(buffer + len, "%s", flags); len += sprintf(buffer + len, "%s", flags);
va_start(argpointer, format); va_start(argpointer, format);
......
...@@ -249,7 +249,7 @@ void taosNotePrint(SNoteObj *pNote, const char *const format, ...) { ...@@ -249,7 +249,7 @@ void taosNotePrint(SNoteObj *pNote, const char *const format, ...) {
curTime = timeSecs.tv_sec; curTime = timeSecs.tv_sec;
ptm = localtime_r(&curTime, &Tm); ptm = localtime_r(&curTime, &Tm);
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetPthreadId()); ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId());
va_start(argpointer, format); va_start(argpointer, format);
len += vsnprintf(buffer + len, MAX_NOTE_LINE_SIZE - len, format, argpointer); len += vsnprintf(buffer + len, MAX_NOTE_LINE_SIZE - len, format, argpointer);
va_end(argpointer); va_end(argpointer);
......
...@@ -447,7 +447,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { ...@@ -447,7 +447,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
} }
static void taosLockList(int64_t *lockedBy) { static void taosLockList(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetSelfPthreadId();
int i = 0; int i = 0;
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) { while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
if (++i % 100 == 0) { if (++i % 100 == 0) {
...@@ -457,7 +457,7 @@ static void taosLockList(int64_t *lockedBy) { ...@@ -457,7 +457,7 @@ static void taosLockList(int64_t *lockedBy) {
} }
static void taosUnlockList(int64_t *lockedBy) { static void taosUnlockList(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetSelfPthreadId();
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) { if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
assert(false); assert(false);
} }
......
...@@ -119,7 +119,7 @@ static void timerDecRef(tmr_obj_t* timer) { ...@@ -119,7 +119,7 @@ static void timerDecRef(tmr_obj_t* timer) {
} }
static void lockTimerList(timer_list_t* list) { static void lockTimerList(timer_list_t* list) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetSelfPthreadId();
int i = 0; int i = 0;
while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) { while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
if (++i % 1000 == 0) { if (++i % 1000 == 0) {
...@@ -129,7 +129,7 @@ static void lockTimerList(timer_list_t* list) { ...@@ -129,7 +129,7 @@ static void lockTimerList(timer_list_t* list) {
} }
static void unlockTimerList(timer_list_t* list) { static void unlockTimerList(timer_list_t* list) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetSelfPthreadId();
if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) { if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
assert(false); assert(false);
tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid); tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
...@@ -257,7 +257,7 @@ static bool removeFromWheel(tmr_obj_t* timer) { ...@@ -257,7 +257,7 @@ static bool removeFromWheel(tmr_obj_t* timer) {
static void processExpiredTimer(void* handle, void* arg) { static void processExpiredTimer(void* handle, void* arg) {
tmr_obj_t* timer = (tmr_obj_t*)handle; tmr_obj_t* timer = (tmr_obj_t*)handle;
timer->executedBy = taosGetPthreadId(); timer->executedBy = taosGetSelfPthreadId();
uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED); uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
if (state == TIMER_STATE_WAITING) { if (state == TIMER_STATE_WAITING) {
const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start."; const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
...@@ -406,7 +406,7 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) { ...@@ -406,7 +406,7 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
return false; return false;
} }
if (timer->executedBy == taosGetPthreadId()) { if (timer->executedBy == taosGetSelfPthreadId()) {
// taosTmrReset is called in the timer callback, should do nothing in this // taosTmrReset is called in the timer callback, should do nothing in this
// case to avoid dead lock. note taosTmrReset must be the last statement // case to avoid dead lock. note taosTmrReset must be the last statement
// of the callback funtion, will be a bug otherwise. // of the callback funtion, will be a bug otherwise.
......
...@@ -45,6 +45,9 @@ typedef struct { ...@@ -45,6 +45,9 @@ typedef struct {
int8_t accessState; int8_t accessState;
int8_t isFull; int8_t isFull;
int8_t isCommiting; int8_t isCommiting;
int8_t dbReplica;
int8_t dropped;
int8_t reserved;
uint64_t version; // current version uint64_t version; // current version
uint64_t cversion; // version while commit start uint64_t cversion; // version while commit start
uint64_t fversion; // version on saved data file uint64_t fversion; // version on saved data file
...@@ -64,7 +67,6 @@ typedef struct { ...@@ -64,7 +67,6 @@ typedef struct {
void * qMgmt; void * qMgmt;
char * rootDir; char * rootDir;
tsem_t sem; tsem_t sem;
int8_t dropped;
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex; pthread_mutex_t statusMutex;
} SVnodeObj; } SVnodeObj;
......
...@@ -38,8 +38,9 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { ...@@ -38,8 +38,9 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel; pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod; pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP; pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP;
pVnode->syncCfg.replica = vnodeMsg->cfg.replications; pVnode->syncCfg.replica = vnodeMsg->cfg.vgReplica;
pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum; pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum;
pVnode->dbReplica = vnodeMsg->cfg.dbReplica;
for (int i = 0; i < pVnode->syncCfg.replica; ++i) { for (int i = 0; i < pVnode->syncCfg.replica; ++i) {
SVnodeDesc *node = &vnodeMsg->nodes[i]; SVnodeDesc *node = &vnodeMsg->nodes[i];
...@@ -203,12 +204,21 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -203,12 +204,21 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
vnodeMsg.cfg.wals = (int8_t)wals->valueint; vnodeMsg.cfg.wals = (int8_t)wals->valueint;
cJSON *replica = cJSON_GetObjectItem(root, "replica"); cJSON *vgReplica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) { if (!vgReplica || vgReplica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, replica not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, replica not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.replications = (int8_t)replica->valueint; vnodeMsg.cfg.vgReplica = (int8_t)vgReplica->valueint;
cJSON *dbReplica = cJSON_GetObjectItem(root, "dbReplica");
if (!dbReplica || dbReplica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, dbReplica not found", pVnode->vgId, file);
vnodeMsg.cfg.dbReplica = vnodeMsg.cfg.vgReplica;
vnodeMsg.cfg.vgCfgVersion = 0;
} else {
vnodeMsg.cfg.dbReplica = (int8_t)dbReplica->valueint;
}
cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) { if (!quorum || quorum->type != cJSON_Number) {
...@@ -220,8 +230,8 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -220,8 +230,8 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow"); cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) { if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file); vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
//goto PARSE_VCFG_ERROR;
vnodeMsg.cfg.cacheLastRow = 0; vnodeMsg.cfg.cacheLastRow = 0;
vnodeMsg.cfg.vgCfgVersion = 0;
} else { } else {
vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint; vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint;
} }
...@@ -233,7 +243,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -233,7 +243,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
int size = cJSON_GetArraySize(nodeInfos); int size = cJSON_GetArraySize(nodeInfos);
if (size != vnodeMsg.cfg.replications) { if (size != vnodeMsg.cfg.vgReplica) {
vError("vgId:%d, failed to read %s, nodeInfos size not matched", pVnode->vgId, file); vError("vgId:%d, failed to read %s, nodeInfos size not matched", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
...@@ -311,17 +321,18 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) { ...@@ -311,17 +321,18 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pMsg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pMsg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pMsg->cfg.walLevel); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pMsg->cfg.walLevel);
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pMsg->cfg.fsyncPeriod); len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pMsg->cfg.fsyncPeriod);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.vgReplica);
len += snprintf(content + len, maxLen - len, " \"dbReplica\": %d,\n", pMsg->cfg.dbReplica);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow); len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pMsg->cfg.replications; i++) { for (int32_t i = 0; i < pMsg->cfg.vgReplica; i++) {
SVnodeDesc *node = &pMsg->nodes[i]; SVnodeDesc *node = &pMsg->nodes[i];
dnodeUpdateEp(node->nodeId, node->nodeEp, NULL, NULL); dnodeUpdateEp(node->nodeId, node->nodeEp, NULL, NULL);
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", node->nodeId); len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", node->nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", node->nodeEp); len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", node->nodeEp);
if (i < pMsg->cfg.replications - 1) { if (i < pMsg->cfg.vgReplica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n"); len += snprintf(content + len, maxLen - len, " },{\n");
} else { } else {
len += snprintf(content + len, maxLen - len, " }]\n"); len += snprintf(content + len, maxLen - len, " }]\n");
......
...@@ -142,6 +142,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { ...@@ -142,6 +142,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
pLoad->totalStorage = htobe64(totalStorage); pLoad->totalStorage = htobe64(totalStorage);
pLoad->compStorage = htobe64(compStorage); pLoad->compStorage = htobe64(compStorage);
pLoad->pointsWritten = htobe64(pointsWritten); pLoad->pointsWritten = htobe64(pointsWritten);
pLoad->vnodeVersion = htobe64(pVnode->version);
pLoad->status = pVnode->status; pLoad->status = pVnode->status;
pLoad->role = pVnode->role; pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica; pLoad->replica = pVnode->syncCfg.replica;
......
...@@ -108,6 +108,13 @@ static int32_t vnodeCheckWrite(void *vparam) { ...@@ -108,6 +108,13 @@ static int32_t vnodeCheckWrite(void *vparam) {
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
} }
if (pVnode->dbReplica != pVnode->syncCfg.replica &&
pVnode->syncCfg.nodeInfo[pVnode->syncCfg.replica - 1].nodeId == dnodeGetDnodeId()) {
vDebug("vgId:%d, vnode is balancing and will be dropped, dbReplica:%d vgReplica:%d, refCount:%d pVnode:%p",
pVnode->vgId, pVnode->dbReplica, pVnode->syncCfg.replica, pVnode->refCount, pVnode);
return TSDB_CODE_VND_IS_BALANCING;
}
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) { if (pVnode->tsdb == NULL) {
vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
...@@ -271,7 +278,7 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { ...@@ -271,7 +278,7 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
SVWriteMsg *pWrite = param; SVWriteMsg *pWrite = param;
SVnodeObj * pVnode = pWrite->pVnode; SVnodeObj * pVnode = pWrite->pVnode;
int32_t code = TSDB_CODE_VND_SYNCING; int32_t code = TSDB_CODE_VND_IS_SYNCING;
if (pVnode->flowctrlLevel <= 0) code = TSDB_CODE_VND_IS_FLOWCTRL; if (pVnode->flowctrlLevel <= 0) code = TSDB_CODE_VND_IS_FLOWCTRL;
......
...@@ -288,6 +288,7 @@ cd ../../../debug; make ...@@ -288,6 +288,7 @@ cd ../../../debug; make
./test.sh -f unique/dnode/data1.sim ./test.sh -f unique/dnode/data1.sim
./test.sh -f unique/dnode/m2.sim ./test.sh -f unique/dnode/m2.sim
./test.sh -f unique/dnode/m3.sim ./test.sh -f unique/dnode/m3.sim
./test.sh -f unique/dnode/lossdata.sim
./test.sh -f unique/dnode/offline1.sim ./test.sh -f unique/dnode/offline1.sim
./test.sh -f unique/dnode/offline2.sim ./test.sh -f unique/dnode/offline2.sim
./test.sh -f unique/dnode/offline3.sim ./test.sh -f unique/dnode/offline3.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode5 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode5 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 4
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 4
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 4
system sh/cfg.sh -n dnode4 -c maxTablesPerVnode -v 4
system sh/cfg.sh -n dnode5 -c maxTablesPerVnode -v 4
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 $data4_1
print dnode2 $data4_2
print dnode3 $data4_3
if $data4_1 != ready then
goto step1
endi
if $data4_2 != ready then
goto step1
endi
if $data4_3 != ready then
goto step1
endi
print ========== step2
sql create database d1 replica 2
sql create table d1.t1 (t timestamp, i int)
print ========== step2.1
sql show dnodes
print dnode1 openVnodes $data2_1
print dnode2 openVnodes $data2_2
print dnode3 openVnodes $data2_3
print dnode4 openVnodes $data2_4
if $data2_1 != 0 then
return -1
endi
if $data2_2 != 1 then
return -1
endi
if $data2_3 != 1 then
return -1
endi
print ========== step3
sql create dnode $hostname4
system sh/exec.sh -n dnode4 -s start
$x = 0
show3:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 openVnodes $data2_1
print dnode2 openVnodes $data2_2
print dnode3 openVnodes $data2_3
print dnode4 openVnodes $data2_4
if $data2_2 != 1 then
goto show3
endi
if $data2_3 != 1 then
goto show3
endi
if $data2_4 != 0 then
goto show3
endi
sql show d1.vgroups;
print d1.vgroups $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print ========== step4
sql drop dnode $hostname3
$i = 0
$rowNum = 10000
while $i < $rowNum
$ts = 1500000000000 + $i
sql insert into d1.t1 values( $ts , $i )
$i = $i + 1
endw
print insert $rowNum finished
$x = 0
show4:
$x = $x + 1
sleep 1000
if $x == 40 then
return -1
endi
sql show dnodes
print dnode1 openVnodes $data2_1
print dnode2 openVnodes $data2_2
print dnode3 openVnodes $data2_3
print dnode4 openVnodes $data2_4
print dnode5 openVnodes $data2_5
if $data2_2 != 1 then
goto show4
endi
if $data2_3 != null then
goto show4
endi
if $data2_4 != 1 then
goto show4
endi
system sh/exec.sh -n dnode3 -s stop -x SIGINT
print ========== step5
sql select count(*) from d1.t1
print select count(*) from d1.t1 ==> $data00
if $data00 != $rowNum then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
\ No newline at end of file
...@@ -50,6 +50,24 @@ $d1_first = $rows ...@@ -50,6 +50,24 @@ $d1_first = $rows
sql select * from log.dn2 sql select * from log.dn2
$d2_first = $rows $d2_first = $rows
$x = 0
show4:
$x = $x + 1
sleep 1000
if $x == 20 then
return -1
endi
sql show mnodes
print dnode1 ==> $data2_1
print dnode2 ==> $data2_2
if $data2_1 != master then
goto show4
endi
if $data2_2 != slave then
goto show4
endi
sleep 3000 sleep 3000
sql select * from log.dn1 sql select * from log.dn1
$d1_second = $rows $d1_second = $rows
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册