未验证 提交 97d87565 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3794 from taosdata/patch/TD-1669

TD-1669
...@@ -957,11 +957,11 @@ static void balanceMonitorDnodeModule() { ...@@ -957,11 +957,11 @@ static void balanceMonitorDnodeModule() {
continue; continue;
} }
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, add mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes); mLInfo("dnode:%d, numOfMnodes:%d expect:%d, create mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
mnodeAddMnode(pDnode->dnodeId); mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, true);
numOfMnodes = mnodeGetMnodesNum(); // Only create one mnode each time
if (numOfMnodes >= tsNumOfMnodes) return; return;
} }
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "tcache.h" #include "tcache.h"
#include "tnote.h" #include "tnote.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscUtil.h" #include "tscUtil.h"
...@@ -260,6 +261,9 @@ void taos_close(TAOS *taos) { ...@@ -260,6 +261,9 @@ void taos_close(TAOS *taos) {
return; return;
} }
pObj->signature = NULL;
taosTmrStopA(&(pObj->pTimer));
SSqlObj* pHb = pObj->pHb; SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode
......
...@@ -33,7 +33,8 @@ typedef struct { ...@@ -33,7 +33,8 @@ typedef struct {
} SMPeerWorker; } SMPeerWorker;
typedef struct { typedef struct {
int32_t num; int32_t curNum;
int32_t maxNum;
SMPeerWorker *peerWorker; SMPeerWorker *peerWorker;
} SMPeerWorkerPool; } SMPeerWorkerPool;
...@@ -46,37 +47,44 @@ static void *dnodeProcessMnodePeerQueue(void *param); ...@@ -46,37 +47,44 @@ static void *dnodeProcessMnodePeerQueue(void *param);
int32_t dnodeInitMnodePeer() { int32_t dnodeInitMnodePeer() {
tsMPeerQset = taosOpenQset(); tsMPeerQset = taosOpenQset();
tsMPeerPool.num = 1; tsMPeerPool.maxNum = 1;
tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.num); tsMPeerPool.curNum = 0;
tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.maxNum);
if (tsMPeerPool.peerWorker == NULL) return -1; if (tsMPeerPool.peerWorker == NULL) return -1;
for (int32_t i = 0; i < tsMPeerPool.num; ++i) { for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
pWorker->workerId = i; pWorker->workerId = i;
dDebug("dnode mpeer worker:%d is created", i);
} }
dInfo("dnode mpeer is opened"); dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
return 0; return 0;
} }
void dnodeCleanupMnodePeer() { void dnodeCleanupMnodePeer() {
for (int32_t i = 0; i < tsMPeerPool.num; ++i) { for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(tsMPeerQset); taosQsetThreadResume(tsMPeerQset);
} }
dDebug("dnode mpeer worker:%d is closed", i);
} }
for (int32_t i = 0; i < tsMPeerPool.num; ++i) { for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
dDebug("dnode mpeer worker:%d start to join", i);
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
dDebug("dnode mpeer worker:%d join success", i);
} }
dDebug("dnode mpeer is closed, qset:%p", tsMPeerQset);
taosCloseQset(tsMPeerQset); taosCloseQset(tsMPeerQset);
tsMPeerQset = NULL;
taosTFree(tsMPeerPool.peerWorker); taosTFree(tsMPeerPool.peerWorker);
dInfo("dnode mpeer is closed");
} }
int32_t dnodeAllocateMnodePqueue() { int32_t dnodeAllocateMnodePqueue() {
...@@ -85,7 +93,7 @@ int32_t dnodeAllocateMnodePqueue() { ...@@ -85,7 +93,7 @@ int32_t dnodeAllocateMnodePqueue() {
taosAddIntoQset(tsMPeerQset, tsMPeerQueue, NULL); taosAddIntoQset(tsMPeerQset, tsMPeerQueue, NULL);
for (int32_t i = 0; i < tsMPeerPool.num; ++i) { for (int32_t i = tsMPeerPool.curNum; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
pWorker->workerId = i; pWorker->workerId = i;
...@@ -98,7 +106,9 @@ int32_t dnodeAllocateMnodePqueue() { ...@@ -98,7 +106,9 @@ int32_t dnodeAllocateMnodePqueue() {
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.num);
tsMPeerPool.curNum = i + 1;
dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.maxNum);
} }
dDebug("dnode mpeer queue:%p is allocated", tsMPeerQueue); dDebug("dnode mpeer queue:%p is allocated", tsMPeerQueue);
...@@ -106,6 +116,7 @@ int32_t dnodeAllocateMnodePqueue() { ...@@ -106,6 +116,7 @@ int32_t dnodeAllocateMnodePqueue() {
} }
void dnodeFreeMnodePqueue() { void dnodeFreeMnodePqueue() {
dDebug("dnode mpeer queue:%p is freed", tsMPeerQueue);
taosCloseQueue(tsMPeerQueue); taosCloseQueue(tsMPeerQueue);
tsMPeerQueue = NULL; tsMPeerQueue = NULL;
} }
...@@ -148,7 +159,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) { ...@@ -148,7 +159,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) {
dDebug("dnodeProcessMnodePeerQueue: got no message from qset, exiting..."); dDebug("qset:%p, mnode peer got no message from qset, exiting", tsMPeerQset);
break; break;
} }
......
...@@ -33,7 +33,8 @@ typedef struct { ...@@ -33,7 +33,8 @@ typedef struct {
} SMReadWorker; } SMReadWorker;
typedef struct { typedef struct {
int32_t num; int32_t curNum;
int32_t maxNum;
SMReadWorker *readWorker; SMReadWorker *readWorker;
} SMReadWorkerPool; } SMReadWorkerPool;
...@@ -46,40 +47,46 @@ static void *dnodeProcessMnodeReadQueue(void *param); ...@@ -46,40 +47,46 @@ static void *dnodeProcessMnodeReadQueue(void *param);
int32_t dnodeInitMnodeRead() { int32_t dnodeInitMnodeRead() {
tsMReadQset = taosOpenQset(); tsMReadQset = taosOpenQset();
tsMReadPool.num = tsNumOfCores * tsNumOfThreadsPerCore / 2; tsMReadPool.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2;
tsMReadPool.num = MAX(2, tsMReadPool.num); tsMReadPool.maxNum = MAX(2, tsMReadPool.maxNum);
tsMReadPool.num = MIN(4, tsMReadPool.num); tsMReadPool.maxNum = MIN(4, tsMReadPool.maxNum);
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.num); tsMReadPool.curNum = 0;
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.maxNum);
if (tsMReadPool.readWorker == NULL) return -1; if (tsMReadPool.readWorker == NULL) return -1;
for (int32_t i = 0; i < tsMReadPool.num; ++i) { for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i; SMReadWorker *pWorker = tsMReadPool.readWorker + i;
pWorker->workerId = i; pWorker->workerId = i;
dDebug("dnode mread worker:%d is created", i);
} }
dInfo("dnode mread is opened"); dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
return 0; return 0;
} }
void dnodeCleanupMnodeRead() { void dnodeCleanupMnodeRead() {
for (int32_t i = 0; i < tsMReadPool.num; ++i) { for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i; SMReadWorker *pWorker = tsMReadPool.readWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(tsMReadQset); taosQsetThreadResume(tsMReadQset);
} }
dDebug("dnode mread worker:%d is closed", i);
} }
for (int32_t i = 0; i < tsMReadPool.num; ++i) { for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i; SMReadWorker *pWorker = tsMReadPool.readWorker + i;
dDebug("dnode mread worker:%d start to join", i);
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
dDebug("dnode mread worker:%d start to join", i);
} }
dDebug("dnode mread is closed, qset:%p", tsMReadQset);
taosCloseQset(tsMReadQset); taosCloseQset(tsMReadQset);
tsMReadQset = NULL;
free(tsMReadPool.readWorker); free(tsMReadPool.readWorker);
dInfo("dnode mread is closed");
} }
int32_t dnodeAllocateMnodeRqueue() { int32_t dnodeAllocateMnodeRqueue() {
...@@ -88,7 +95,7 @@ int32_t dnodeAllocateMnodeRqueue() { ...@@ -88,7 +95,7 @@ int32_t dnodeAllocateMnodeRqueue() {
taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL); taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL);
for (int32_t i = 0; i < tsMReadPool.num; ++i) { for (int32_t i = tsMReadPool.curNum; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i; SMReadWorker *pWorker = tsMReadPool.readWorker + i;
pWorker->workerId = i; pWorker->workerId = i;
...@@ -101,7 +108,8 @@ int32_t dnodeAllocateMnodeRqueue() { ...@@ -101,7 +108,8 @@ int32_t dnodeAllocateMnodeRqueue() {
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.num); tsMReadPool.curNum = i + 1;
dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.maxNum);
} }
dDebug("dnode mread queue:%p is allocated", tsMReadQueue); dDebug("dnode mread queue:%p is allocated", tsMReadQueue);
...@@ -109,6 +117,7 @@ int32_t dnodeAllocateMnodeRqueue() { ...@@ -109,6 +117,7 @@ int32_t dnodeAllocateMnodeRqueue() {
} }
void dnodeFreeMnodeRqueue() { void dnodeFreeMnodeRqueue() {
dDebug("dnode mread queue:%p is freed", tsMReadQueue);
taosCloseQueue(tsMReadQueue); taosCloseQueue(tsMReadQueue);
tsMReadQueue = NULL; tsMReadQueue = NULL;
} }
...@@ -156,7 +165,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) { ...@@ -156,7 +165,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) {
dDebug("dnodeProcessMnodeReadQueue: got no message from qset, exiting..."); dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset);
break; break;
} }
......
...@@ -34,7 +34,8 @@ typedef struct { ...@@ -34,7 +34,8 @@ typedef struct {
} SMWriteWorker; } SMWriteWorker;
typedef struct { typedef struct {
int32_t num; int32_t curNum;
int32_t maxNum;
SMWriteWorker *writeWorker; SMWriteWorker *writeWorker;
} SMWriteWorkerPool; } SMWriteWorkerPool;
...@@ -47,38 +48,45 @@ static void *dnodeProcessMnodeWriteQueue(void *param); ...@@ -47,38 +48,45 @@ static void *dnodeProcessMnodeWriteQueue(void *param);
int32_t dnodeInitMnodeWrite() { int32_t dnodeInitMnodeWrite() {
tsMWriteQset = taosOpenQset(); tsMWriteQset = taosOpenQset();
tsMWritePool.num = 1; tsMWritePool.maxNum = 1;
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.num); tsMWritePool.curNum = 0;
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.maxNum);
if (tsMWritePool.writeWorker == NULL) return -1; if (tsMWritePool.writeWorker == NULL) return -1;
for (int32_t i = 0; i < tsMWritePool.num; ++i) { for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
pWorker->workerId = i; pWorker->workerId = i;
dDebug("dnode mwrite worker:%d is created", i);
} }
dInfo("dnode mwrite is opened"); dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
return 0; return 0;
} }
void dnodeCleanupMnodeWrite() { void dnodeCleanupMnodeWrite() {
for (int32_t i = 0; i < tsMWritePool.num; ++i) { for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(tsMWriteQset); taosQsetThreadResume(tsMWriteQset);
} }
dDebug("dnode mwrite worker:%d is closed", i);
} }
for (int32_t i = 0; i < tsMWritePool.num; ++i) { for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
dDebug("dnode mwrite worker:%d start to join", i);
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
dDebug("dnode mwrite worker:%d join success", i);
} }
dDebug("dnode mwrite is closed, qset:%p", tsMWriteQset);
taosCloseQset(tsMWriteQset); taosCloseQset(tsMWriteQset);
tsMWriteQset = NULL;
taosTFree(tsMWritePool.writeWorker); taosTFree(tsMWritePool.writeWorker);
dInfo("dnode mwrite is closed");
} }
int32_t dnodeAllocateMnodeWqueue() { int32_t dnodeAllocateMnodeWqueue() {
...@@ -87,7 +95,7 @@ int32_t dnodeAllocateMnodeWqueue() { ...@@ -87,7 +95,7 @@ int32_t dnodeAllocateMnodeWqueue() {
taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL); taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL);
for (int32_t i = 0; i < tsMWritePool.num; ++i) { for (int32_t i = tsMWritePool.curNum; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
pWorker->workerId = i; pWorker->workerId = i;
...@@ -100,7 +108,8 @@ int32_t dnodeAllocateMnodeWqueue() { ...@@ -100,7 +108,8 @@ int32_t dnodeAllocateMnodeWqueue() {
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.num); tsMWritePool.curNum = i + 1;
dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.maxNum);
} }
dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue); dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue);
...@@ -108,6 +117,7 @@ int32_t dnodeAllocateMnodeWqueue() { ...@@ -108,6 +117,7 @@ int32_t dnodeAllocateMnodeWqueue() {
} }
void dnodeFreeMnodeWqueue() { void dnodeFreeMnodeWqueue() {
dDebug("dnode mwrite queue:%p is freed", tsMWriteQueue);
taosCloseQueue(tsMWriteQueue); taosCloseQueue(tsMWriteQueue);
tsMWriteQueue = NULL; tsMWriteQueue = NULL;
} }
...@@ -122,11 +132,15 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { ...@@ -122,11 +132,15 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
mnodeCreateMsg(pWrite, pMsg); mnodeCreateMsg(pWrite, pMsg);
dDebug("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]); dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
} }
static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
mnodeCleanupMsg(pWrite); mnodeCleanupMsg(pWrite);
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
} }
...@@ -158,7 +172,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) { ...@@ -158,7 +172,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) { if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
dDebug("dnodeProcessMnodeWriteQueue: got no message from qset, exiting..."); dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset);
break; break;
} }
...@@ -182,8 +196,8 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) { ...@@ -182,8 +196,8 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
dnodeSendRedirectMsg(pMsg, true); dnodeSendRedirectMsg(pMsg, true);
dnodeFreeMnodeWriteMsg(pWrite); dnodeFreeMnodeWriteMsg(pWrite);
} else { } else {
dDebug("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
} }
......
...@@ -74,14 +74,16 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); ...@@ -74,14 +74,16 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitMgmt() { int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
dnodeReadDnodeCfg(); dnodeReadDnodeCfg();
...@@ -226,7 +228,7 @@ static void *dnodeProcessMgmtQueue(void *param) { ...@@ -226,7 +228,7 @@ static void *dnodeProcessMgmtQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) { if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) {
dDebug("dnode mgmt got no message from qset, exit ..."); dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
break; break;
} }
...@@ -451,10 +453,34 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { ...@@ -451,10 +453,34 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
} }
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; SMDCfgDnodeMsg *pCfg = pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config); return taosCfgDynamicOptions(pCfg->config);
} }
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SMDCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
}
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.nodeNum);
for (int i = 0; i < pCfg->mnodes.nodeNum; ++i) {
pCfg->mnodes.nodeInfos[i].nodeId = htonl(pCfg->mnodes.nodeInfos[i].nodeId);
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.nodeInfos[i].nodeId, pCfg->mnodes.nodeInfos[i].nodeEp);
}
dnodeStartMnode(&pCfg->mnodes);
return TSDB_CODE_SUCCESS;
}
void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
if (pEpSet->numOfEps <= 0) { if (pEpSet->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it"); dError("mnode EP list for peer is changed, but content is invalid, discard it");
...@@ -465,29 +491,6 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { ...@@ -465,29 +491,6 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
for (int i = 0; i < pEpSet->numOfEps; ++i) { for (int i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
if (!mnodeIsRunning()) {
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
bool find = false;
for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) {
if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) {
dInfo("localEp found in mnode infos");
find = true;
break;
}
}
if (!find) {
dInfo("localEp not found in mnode infos, will set into mnode infos");
tstrncpy(tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeEp, tsLocalEp, TSDB_EP_LEN);
tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeId = dnodeGetDnodeId();
tsDMnodeInfos.nodeNum++;
}
dnodeStartMnode();
}
}
} }
tsDMnodeEpSet = *pEpSet; tsDMnodeEpSet = *pEpSet;
...@@ -532,7 +535,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -532,7 +535,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
} }
vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
dnodeProcessModuleStatus(pCfg->moduleStatus);
// will not set mnode in status msg
// dnodeProcessModuleStatus(pCfg->moduleStatus);
dnodeUpdateDnodeCfg(pCfg); dnodeUpdateDnodeCfg(pCfg);
dnodeUpdateMnodeInfos(pMnodes); dnodeUpdateMnodeInfos(pMnodes);
...@@ -576,7 +581,7 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { ...@@ -576,7 +581,7 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
} }
dnodeSaveMnodeInfos(); dnodeSaveMnodeInfos();
sdbUpdateSync(); sdbUpdateAsync();
} }
static bool dnodeReadMnodeInfos() { static bool dnodeReadMnodeInfos() {
......
...@@ -146,7 +146,9 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { ...@@ -146,7 +146,9 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
} }
} }
bool dnodeStartMnode() { bool dnodeStartMnode(void *pMnodes) {
SDMMnodeInfos *mnodes = pMnodes;
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) { if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
dDebug("mnode module is already started, module status:%d", tsModuleStatus); dDebug("mnode module is already started, module status:%d", tsModuleStatus);
return false; return false;
...@@ -156,6 +158,7 @@ bool dnodeStartMnode() { ...@@ -156,6 +158,7 @@ bool dnodeStartMnode() {
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
dnodeProcessModuleStatus(moduleStatus); dnodeProcessModuleStatus(moduleStatus);
sdbUpdateSync(); sdbUpdateSync(mnodes);
return true; return true;
} }
...@@ -48,6 +48,7 @@ int32_t dnodeInitServer() { ...@@ -48,6 +48,7 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue;
...@@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { ...@@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg); rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg);
} }
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet); dnodeGetMnodeEpSetForPeer(&epSet);
rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp); rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
} }
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsDnodeClientRpc, epSet, rpcMsg, rpcRsp);
}
\ No newline at end of file
...@@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char ...@@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
dDebug("user:%s, send auth msg to mnodes", user); dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
...@@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) { ...@@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE; rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE;
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
terrno = rpcRsp.code; terrno = rpcRsp.code;
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
......
...@@ -199,7 +199,7 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -199,7 +199,7 @@ static void *dnodeProcessReadQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
dDebug("dnodeProcessReadQueee: got no message from qset, exiting..."); dDebug("qset:%p dnode read got no message from qset, exiting", readQset);
break; break;
} }
......
...@@ -222,7 +222,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -222,7 +222,7 @@ static void *dnodeProcessWriteQueue(void *param) {
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
dDebug("dnodeProcessWriteQueee: got no message from qset, exiting..."); dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset);
break; break;
} }
......
...@@ -43,11 +43,12 @@ void dnodeGetMnodeEpSetForPeer(void *epSet); ...@@ -43,11 +43,12 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet); void dnodeGetMnodeEpSetForShell(void *epSet);
void * dnodeGetMnodeInfos(); void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
bool dnodeStartMnode(); bool dnodeStartMnode(void *pModes);
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid);
void *dnodeAllocateVnodeWqueue(void *pVnode); void *dnodeAllocateVnodeWqueue(void *pVnode);
......
...@@ -60,7 +60,8 @@ int32_t mnodeInitSystem(); ...@@ -60,7 +60,8 @@ int32_t mnodeInitSystem();
int32_t mnodeStartSystem(); int32_t mnodeStartSystem();
void mnodeCleanupSystem(); void mnodeCleanupSystem();
void mnodeStopSystem(); void mnodeStopSystem();
void sdbUpdateSync(); void sdbUpdateAsync();
void sdbUpdateSync(void *pMnodes);
bool mnodeIsRunning(); bool mnodeIsRunning();
int32_t mnodeProcessRead(SMnodeMsg *pMsg); int32_t mnodeProcessRead(SMnodeMsg *pMsg);
int32_t mnodeProcessWrite(SMnodeMsg *pMsg); int32_t mnodeProcessWrite(SMnodeMsg *pMsg);
......
...@@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr ...@@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, 0, 0x033D, "Dnode Id not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, 0, 0x033E, "Dnode Ep not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account")
......
...@@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) ...@@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
...@@ -719,6 +719,12 @@ typedef struct { ...@@ -719,6 +719,12 @@ typedef struct {
char ep[TSDB_EP_LEN]; // end point, hostname:port char ep[TSDB_EP_LEN]; // end point, hostname:port
} SCMCreateDnodeMsg, SCMDropDnodeMsg; } SCMCreateDnodeMsg, SCMDropDnodeMsg;
typedef struct {
int32_t dnodeId;
char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
SDMMnodeInfos mnodes;
} SMDCreateMnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t vgId; int32_t vgId;
......
...@@ -31,7 +31,7 @@ typedef enum { ...@@ -31,7 +31,7 @@ typedef enum {
int32_t mnodeInitMnodes(); int32_t mnodeInitMnodes();
void mnodeCleanupMnodes(); void mnodeCleanupMnodes();
int32_t mnodeAddMnode(int32_t dnodeId); void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm);
int32_t mnodeDropMnode(int32_t dnodeId); int32_t mnodeDropMnode(int32_t dnodeId);
void mnodeDropMnodeLocal(int32_t dnodeId); void mnodeDropMnodeLocal(int32_t dnodeId);
......
...@@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() { ...@@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() {
mnodeCreateDnode(tsLocalEp, NULL); mnodeCreateDnode(tsLocalEp, NULL);
SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp); SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp);
if (pDnode != NULL) { if (pDnode != NULL) {
mnodeAddMnode(pDnode->dnodeId); mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, false);
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
} }
......
...@@ -109,7 +109,7 @@ int32_t mnodeStartSystem() { ...@@ -109,7 +109,7 @@ int32_t mnodeStartSystem() {
mInfo("mnode is initialized successfully"); mInfo("mnode is initialized successfully");
sdbUpdateSync(); sdbUpdateSync(NULL);
return 0; return 0;
} }
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "tutil.h" #include "tutil.h"
#include "tsocket.h" #include "tsocket.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "dnode.h"
#include "mnode.h"
#include "mnodeDef.h" #include "mnodeDef.h"
#include "mnodeInt.h" #include "mnodeInt.h"
#include "mnodeMnode.h" #include "mnodeMnode.h"
...@@ -30,6 +32,7 @@ ...@@ -30,6 +32,7 @@
#include "mnodeSdb.h" #include "mnodeSdb.h"
#include "mnodeShow.h" #include "mnodeShow.h"
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h"
static void * tsMnodeSdb = NULL; static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0; static int32_t tsMnodeUpdateSize = 0;
...@@ -266,25 +269,87 @@ void mnodeGetMnodeInfos(void *mnodeInfos) { ...@@ -266,25 +269,87 @@ void mnodeGetMnodeInfos(void *mnodeInfos) {
mnodeMnodeUnLock(); mnodeMnodeUnLock();
} }
int32_t mnodeAddMnode(int32_t dnodeId) { static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
mDebug("dnode:%d, send create mnode msg to dnode %s", dnodeId, dnodeEp);
SMDCreateMnodeMsg *pCreate = rpcMallocCont(sizeof(SMDCreateMnodeMsg));
if (pCreate == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
pCreate->dnodeId = htonl(dnodeId);
tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
pCreate->mnodes = tsMnodeInfos;
bool found = false;
for (int i = 0; i < pCreate->mnodes.nodeNum; ++i) {
if (pCreate->mnodes.nodeInfos[i].nodeId == htonl(dnodeId)) {
found = true;
}
}
if (!found) {
pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeId = htonl(dnodeId);
tstrncpy(pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
pCreate->mnodes.nodeNum++;
}
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pCreate;
rpcMsg.contLen = sizeof(SMDCreateMnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_MD_CREATE_MNODE;
SRpcMsg rpcRsp = {0};
SRpcEpSet epSet = mnodeGetEpSetFromIp(pCreate->dnodeEp);
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp, &epSet);
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
mError("dnode:%d, failed to send create mnode msg, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(rpcRsp.code));
} else {
mDebug("dnode:%d, create mnode msg is disposed, mnode is created in dnode", dnodeId);
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
mError("failed to create mnode, reason:%s", tstrerror(code));
} else {
mDebug("mnode is created successfully");
mnodeUpdateMnodeEpSet();
sdbUpdateAsync();
}
return code;
}
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
pMnode->mnodeId = dnodeId; pMnode->mnodeId = dnodeId;
pMnode->createdTime = taosGetTimestampMs(); pMnode->createdTime = taosGetTimestampMs();
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb, .table = tsMnodeSdb,
.pObj = pMnode, .pObj = pMnode,
.writeCb = mnodeCreateMnodeCb
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = TSDB_CODE_SUCCESS;
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (needConfirm) {
taosTFree(pMnode); code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp);
} }
mnodeUpdateMnodeEpSet(); if (code != TSDB_CODE_SUCCESS) {
taosTFree(pMnode);
return;
}
return code; code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code));
taosTFree(pMnode);
}
} }
void mnodeDropMnodeLocal(int32_t dnodeId) { void mnodeDropMnodeLocal(int32_t dnodeId) {
...@@ -296,6 +361,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) { ...@@ -296,6 +361,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) {
} }
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet();
sdbUpdateAsync();
} }
int32_t mnodeDropMnode(int32_t dnodeId) { int32_t mnodeDropMnode(int32_t dnodeId) {
...@@ -315,6 +381,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) { ...@@ -315,6 +381,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
sdbDecRef(tsMnodeSdb, pMnode); sdbDecRef(tsMnodeSdb, pMnode);
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet();
sdbUpdateAsync();
return code; return code;
} }
......
...@@ -58,10 +58,15 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -58,10 +58,15 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mpeer queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, mDebug("%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
} }
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
......
...@@ -51,14 +51,21 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -51,14 +51,21 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet); mnodeGetMnodeEpSetForShell(epSet);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse); mDebug("%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i])); if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
} }
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -91,6 +91,7 @@ typedef struct { ...@@ -91,6 +91,7 @@ typedef struct {
} SSdbWriteWorkerPool; } SSdbWriteWorkerPool;
extern void * tsMnodeTmr; extern void * tsMnodeTmr;
static void * tsUpdateSyncTmr;
static SSdbObject tsSdbObj = {0}; static SSdbObject tsSdbObj = {0};
static taos_qset tsSdbWriteQset; static taos_qset tsSdbWriteQset;
static taos_qall tsSdbWriteQall; static taos_qall tsSdbWriteQall;
...@@ -297,27 +298,25 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { ...@@ -297,27 +298,25 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
taosFreeQitem(pOper); taosFreeQitem(pOper);
} }
void sdbUpdateSync() { static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); }
void sdbUpdateAsync() {
taosTmrReset(sdbUpdateSyncTmrFp, 200, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
}
void sdbUpdateSync(void *pMnodes) {
SDMMnodeInfos *mnodes = pMnodes;
if (!mnodeIsRunning()) { if (!mnodeIsRunning()) {
mDebug("mnode not start yet, update sync info later"); mDebug("mnode not start yet, update sync config later");
return; return;
} }
mDebug("update sync info in sdb"); mDebug("update sync config in sync module, mnodes:%p", pMnodes);
SSyncCfg syncCfg = {0}; SSyncCfg syncCfg = {0};
int32_t index = 0; int32_t index = 0;
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); if (mnodes == NULL) {
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
syncCfg.nodeInfo[i].nodeId = node->nodeId;
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[i].nodeFqdn, &syncCfg.nodeInfo[i].nodePort);
syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
index++;
}
if (index == 0) {
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SMnodeObj *pMnode = NULL; SMnodeObj *pMnode = NULL;
...@@ -337,9 +336,19 @@ void sdbUpdateSync() { ...@@ -337,9 +336,19 @@ void sdbUpdateSync() {
mnodeDecMnodeRef(pMnode); mnodeDecMnodeRef(pMnode);
} }
sdbFreeIter(pIter); sdbFreeIter(pIter);
syncCfg.replica = index;
mDebug("mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica);
} else {
for (index = 0; index < mnodes->nodeNum; ++index) {
SDMMnodeInfo *node = &mnodes->nodeInfos[index];
syncCfg.nodeInfo[index].nodeId = node->nodeId;
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort);
syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC;
}
syncCfg.replica = index;
mDebug("mnodes info input, numOfMnodes:%d", syncCfg.replica);
} }
syncCfg.replica = index;
syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
bool hasThisDnode = false; bool hasThisDnode = false;
...@@ -350,8 +359,15 @@ void sdbUpdateSync() { ...@@ -350,8 +359,15 @@ void sdbUpdateSync() {
} }
} }
if (!hasThisDnode) return; if (!hasThisDnode) {
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return; sdbDebug("update sync config, this dnode not exist");
return;
}
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) {
sdbDebug("update sync config, info not changed");
return;
}
sdbInfo("work as mnode, replica:%d", syncCfg.replica); sdbInfo("work as mnode, replica:%d", syncCfg.replica);
for (int32_t i = 0; i < syncCfg.replica; ++i) { for (int32_t i = 0; i < syncCfg.replica; ++i) {
...@@ -1038,7 +1054,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -1038,7 +1054,7 @@ static void *sdbWorkerFp(void *param) {
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed); numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
sdbDebug("sdbWorkerFp: got no message from qset, exiting..."); sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWriteQset);
break; break;
} }
......
...@@ -310,7 +310,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl ...@@ -310,7 +310,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->pDnode == pDnode) { if (pVgid->pDnode == pDnode) {
mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d", pDnode->dnodeId, pVgroup->vgId, pVgid->role); mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d:%s", pDnode->dnodeId, pVgroup->vgId, pVgid->role, syncRole[pVgid->role]);
pVgid->role = pVload->role; pVgid->role = pVload->role;
if (pVload->role == TAOS_SYNC_ROLE_MASTER) { if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
pVgroup->inUse = i; pVgroup->inUse = i;
......
...@@ -54,11 +54,17 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -54,11 +54,17 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], mDebug("app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg,
epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
htons(epSet->port[i])); epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("app:%p:%p, mnode index:%d ep:%s:%d, set inUse to %d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
htons(epSet->port[i]));
}
} }
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
......
...@@ -67,7 +67,7 @@ static void *httpProcessResultQueue(void *param) { ...@@ -67,7 +67,7 @@ static void *httpProcessResultQueue(void *param) {
while (1) { while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("httpResultQueue: got no message from qset, exiting..."); httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset);
break; break;
} }
......
...@@ -542,10 +542,7 @@ void rpcCancelRequest(void *handle) { ...@@ -542,10 +542,7 @@ void rpcCancelRequest(void *handle) {
if (pContext->pConn) { if (pContext->pConn) {
tDebug("%s, app tries to cancel request", pContext->pConn->info); tDebug("%s, app tries to cancel request", pContext->pConn->info);
pContext->pConn->pReqMsg = NULL;
rpcCloseConn(pContext->pConn); rpcCloseConn(pContext->pConn);
pContext->pConn = NULL;
rpcFreeCont(pContext->pCont);
} }
} }
...@@ -613,8 +610,10 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -613,8 +610,10 @@ static void rpcReleaseConn(SRpcConn *pConn) {
if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg
} else { } else {
// if there is an outgoing message, free it // if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg) if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL;
rpcFreeMsg(pConn->pReqMsg); rpcFreeMsg(pConn->pReqMsg);
}
} }
// memset could not be used, since lockeBy can not be reset // memset could not be used, since lockeBy can not be reset
...@@ -1121,9 +1120,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1121,9 +1120,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
if (pEpSet->numOfEps > 0) { if (pEpSet->numOfEps > 0) {
memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps); tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
for (int i=0; i<pContext->epSet.numOfEps; ++i) pContext->epSet.inUse);
for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
pContext->epSet.port[i] = htons(pContext->epSet.port[i]); pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i],
pContext->epSet.port[i]);
}
} }
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
...@@ -525,7 +525,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -525,7 +525,7 @@ static void *taosProcessTcpData(void *param) {
while (pThreadObj->pHead) { while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead; SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next; pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj); taosReportBrokenLink(pFdObj);
} }
pthread_mutex_destroy(&(pThreadObj->mutex)); pthread_mutex_destroy(&(pThreadObj->mutex));
......
...@@ -215,6 +215,9 @@ void syncStop(void *param) { ...@@ -215,6 +215,9 @@ void syncStop(void *param) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer) syncRemovePeer(pPeer); if (pPeer) syncRemovePeer(pPeer);
...@@ -223,9 +226,6 @@ void syncStop(void *param) { ...@@ -223,9 +226,6 @@ void syncStop(void *param) {
pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pPeer) syncRemovePeer(pPeer); if (pPeer) syncRemovePeer(pPeer);
if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode); syncDecNodeRef(pNode);
...@@ -313,6 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { ...@@ -313,6 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
// always update version // always update version
nodeVersion = pWalHead->version; nodeVersion = pWalHead->version;
sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded // only pkt from RPC or CQ can be forwarded
...@@ -1189,6 +1191,8 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1189,6 +1191,8 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
static void syncMonitorFwdInfos(void *param, void *tmrId) { static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
if (pSyncFwds == NULL) return;
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
......
...@@ -263,6 +263,7 @@ void taosCloseQset(taos_qset param) { ...@@ -263,6 +263,7 @@ void taosCloseQset(taos_qset param) {
// thread to exit. // thread to exit.
void taosQsetThreadResume(taos_qset param) { void taosQsetThreadResume(taos_qset param) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
uDebug("qset:%p, it will exit", qset);
tsem_post(&qset->sem); tsem_post(&qset->sem);
} }
......
...@@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG ...@@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 143" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 143" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 143" >> $TAOS_CFG
echo "vDebugFlag 135" >> $TAOS_CFG echo "vDebugFlag 143" >> $TAOS_CFG
echo "tsdbDebugFlag 135" >> $TAOS_CFG echo "tsdbDebugFlag 143" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 143" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG echo "jnidebugFlag 143" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG echo "odbcdebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG echo "monitorDebugFlag 143" >> $TAOS_CFG
echo "mqttDebugFlag 135" >> $TAOS_CFG echo "mqttDebugFlag 143" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG echo "qdebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG echo "udebugFlag 143" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG echo "sdebugFlag 143" >> $TAOS_CFG
echo "wdebugFlag 135" >> $TAOS_CFG echo "wdebugFlag 143" >> $TAOS_CFG
echo "cqdebugFlag 135" >> $TAOS_CFG echo "cqdebugFlag 143" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG
......
...@@ -109,15 +109,10 @@ echo "dataDir $DATA_DIR" >> $TAOS_CFG ...@@ -109,15 +109,10 @@ echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG echo "udebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "tablemetakeeptimer 5" >> $TAOS_CFG echo "tablemetakeeptimer 5" >> $TAOS_CFG
echo "wal 0" >> $TAOS_CFG echo "wal 0" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG
......
...@@ -27,7 +27,16 @@ system sh/exec.sh -n dnode2 -s start ...@@ -27,7 +27,16 @@ system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname3 sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode3 -s start
sleep 5000 sleep 3000
$x = 0
show2:
$x = $x + 1
sleep 2000
if $x == 10 then
return -1
endi
sql show mnodes sql show mnodes
$dnode1Role = $data2_1 $dnode1Role = $data2_1
$dnode2Role = $data2_2 $dnode2Role = $data2_2
...@@ -37,6 +46,16 @@ print $dnode1Role ...@@ -37,6 +46,16 @@ print $dnode1Role
print $dnode2Role print $dnode2Role
print $dnode3Role print $dnode3Role
if $dnode1Role != master then
goto show2
endi
if $dnode2Role != slave then
goto show2
endi
if $dnode3Role != slave then
goto show2
endi
print ============================== step3 print ============================== step3
$count = 2 $count = 2
while $count < 102 while $count < 102
......
...@@ -26,11 +26,11 @@ $x = 0 ...@@ -26,11 +26,11 @@ $x = 0
show2: show2:
$x = $x + 1 $x = $x + 1
sleep 2000 sleep 2000
if $x == 10 then if $x == 5 then
return -1 return -1
endi endi
sql show mnodes sql show mnodes -x show2
print dnode1 ==> $data2_1 print dnode1 ==> $data2_1
print dnode2 ==> $data2_2 print dnode2 ==> $data2_2
if $data2_1 != master then if $data2_1 != master then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册