未验证 提交 1fbc186c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9065 from taosdata/feature/dnode3

Feature/dnode3
......@@ -112,6 +112,7 @@ typedef struct SDnode {
EStat stat;
SDnodeOpt opt;
SDnodeDir dir;
FileFd lockFd;
SDnodeMgmt dmgmt;
SMnodeMgmt mmgmt;
SVnodesMgmt vmgmt;
......
......@@ -454,7 +454,6 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
......@@ -467,7 +466,6 @@ static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
}
static void *dnodeThreadRoutine(void *param) {
......@@ -567,8 +565,10 @@ void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dError("RPC %p, dnode req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
......@@ -585,5 +585,7 @@ void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
default:
dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
......@@ -55,7 +55,7 @@ void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup) {
pStartup->finished = (dndGetStat(pDnode) == DND_STAT_RUNNING);
}
static int32_t dndCheckRunning(char *dataDir) {
static FileFd dndCheckRunning(char *dataDir) {
char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dataDir);
......@@ -74,11 +74,12 @@ static int32_t dndCheckRunning(char *dataDir) {
return -1;
}
return 0;
return fd;
}
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) {
if (dndCheckRunning(pOption->dataDir) != 0) {
pDnode->lockFd = dndCheckRunning(pOption->dataDir);
if (pDnode->lockFd < 0) {
return -1;
}
......@@ -133,6 +134,12 @@ static void dndCleanupEnv(SDnode *pDnode) {
tfree(pDnode->dir.dnode);
}
if (pDnode->lockFd >= 0) {
taosUnLockFile(pDnode->lockFd);
taosCloseFile(pDnode->lockFd);
pDnode->lockFd = 0;
}
taosStopCacheRefreshWorker();
}
......@@ -202,6 +209,8 @@ SDnode *dndInit(SDnodeOpt *pOption) {
}
void dndCleanup(SDnode *pDnode) {
if (pDnode == NULL) return;
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("dnode is shutting down");
return;
......
......@@ -33,7 +33,7 @@ class DndTestAcct : public ::testing::Test {
}
static void TearDownTestSuite() {
dropServer(pServer);
stopServer(pServer);
dropClient(pClient);
}
......
......@@ -33,7 +33,7 @@ class DndTestCluster : public ::testing::Test {
}
static void TearDownTestSuite() {
dropServer(pServer);
stopServer(pServer);
dropClient(pClient);
}
......
......@@ -38,12 +38,18 @@ class DndTestDnode : public ::testing::Test {
}
static void TearDownTestSuite() {
dropServer(pServer1);
dropServer(pServer2);
dropServer(pServer3);
dropServer(pServer4);
dropServer(pServer5);
stopServer(pServer1);
stopServer(pServer2);
stopServer(pServer3);
stopServer(pServer4);
stopServer(pServer5);
dropClient(pClient);
pServer1 = NULL;
pServer2 = NULL;
pServer3 = NULL;
pServer4 = NULL;
pServer5 = NULL;
pClient = NULL;
}
static SServer* pServer1;
......@@ -107,7 +113,7 @@ class DndTestDnode : public ::testing::Test {
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows, int32_t completed) {
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
......@@ -133,7 +139,7 @@ class DndTestDnode : public ::testing::Test {
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->offset, 0);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
EXPECT_EQ(pRetrieveRsp->completed, completed);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->reserved, 0);
......@@ -192,7 +198,7 @@ TEST_F(DndTestDnode, ShowDnode) {
CheckSchema(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time");
CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline reason");
SendThenCheckShowRetrieveMsg(1, 1);
SendThenCheckShowRetrieveMsg(1);
CheckInt16(1);
CheckBinary("localhost:9521", TSDB_EP_LEN);
CheckInt16(0);
......@@ -234,7 +240,7 @@ TEST_F(DndTestDnode, CreateDnode_01) {
taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(2, 1);
SendThenCheckShowRetrieveMsg(2);
CheckInt16(1);
CheckInt16(2);
CheckBinary("localhost:9521", TSDB_EP_LEN);
......@@ -267,7 +273,7 @@ TEST_F(DndTestDnode, DropDnode_01) {
taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(1, 0);
SendThenCheckShowRetrieveMsg(1);
CheckInt16(1);
CheckBinary("localhost:9521", TSDB_EP_LEN);
CheckInt16(0);
......@@ -325,7 +331,7 @@ TEST_F(DndTestDnode, CreateDnode_02) {
taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(4, 0);
SendThenCheckShowRetrieveMsg(4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
......@@ -355,3 +361,58 @@ TEST_F(DndTestDnode, CreateDnode_02) {
CheckBinary("", 24);
CheckBinary("", 24);
}
TEST_F(DndTestDnode, RestartDnode_01) {
uInfo("===> stop all server");
stopServer(pServer1);
stopServer(pServer2);
stopServer(pServer3);
stopServer(pServer4);
stopServer(pServer5);
pServer1 = NULL;
pServer2 = NULL;
pServer3 = NULL;
pServer4 = NULL;
pServer5 = NULL;
taosMsleep(3000); // wait tcp port cleanedup
uInfo("===> start all server");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9521";
pServer1 = startServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp);
uInfo("===> all server is running");
// taosMsleep(1300);
// SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
// SendThenCheckShowRetrieveMsg(4);
// CheckInt16(1);
// CheckInt16(3);
// CheckInt16(4);
// CheckInt16(5);
// CheckBinary("localhost:9521", TSDB_EP_LEN);
// CheckBinary("localhost:9523", TSDB_EP_LEN);
// CheckBinary("localhost:9524", TSDB_EP_LEN);
// CheckBinary("localhost:9525", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
}
......@@ -33,7 +33,7 @@ class DndTestProfile : public ::testing::Test {
}
static void TearDownTestSuite() {
dropServer(pServer);
stopServer(pServer);
dropClient(pClient);
}
......
......@@ -33,7 +33,7 @@ class DndTestShow : public ::testing::Test {
}
static void TearDownTestSuite() {
dropServer(pServer);
stopServer(pServer);
dropClient(pClient);
}
......
......@@ -26,7 +26,7 @@ void initLog(const char* path) {
httpDebugFlag = 0;
mqttDebugFlag = 0;
monDebugFlag = 0;
uDebugFlag = 0;
uDebugFlag = 143;
rpcDebugFlag = 0;
odbcDebugFlag = 0;
qDebugFlag = 0;
......@@ -34,7 +34,9 @@ void initLog(const char* path) {
sDebugFlag = 0;
tsdbDebugFlag = 0;
cqDebugFlag = 0;
tscEmbeddedInUtil = 1;
taosRemoveDir(path);
taosMkDir(path);
char temp[PATH_MAX];
......@@ -70,8 +72,7 @@ void initOption(SDnodeOpt* pOption, const char* path, const char* fqdn, uint16_t
snprintf(pOption->firstEp, TSDB_EP_LEN, "%s", firstEp);
}
SServer* createServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
taosRemoveDir(path);
SServer* startServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
taosMkDir(path);
SDnodeOpt option = {0};
......@@ -90,11 +91,21 @@ SServer* createServer(const char* path, const char* fqdn, uint16_t port, const c
return pServer;
}
void dropServer(SServer* pServer) {
SServer* createServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
taosRemoveDir(path);
return startServer(path, fqdn, port, firstEp);
}
void stopServer(SServer* pServer) {
if (pServer == NULL) return;
if (pServer->threadId != NULL) {
taosDestoryThread(pServer->threadId);
}
if (pServer->pDnode != NULL) {
dndCleanup(pServer->pDnode);
pServer->pDnode = NULL;
}
}
void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
......
......@@ -41,7 +41,8 @@ typedef struct {
void initLog(const char* path);
SServer* createServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
void dropServer(SServer* pServer);
SServer* startServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
void stopServer(SServer* pServer);
SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port);
void dropClient(SClient* pClient);
void sendMsg(SClient* pClient, SRpcMsg* pMsg);
......@@ -34,7 +34,7 @@ class DndTestUser : public ::testing::Test {
}
static void TearDownTestSuite() {
dropServer(pServer);
stopServer(pServer);
dropClient(pClient);
}
......
......@@ -258,8 +258,9 @@ static int32_t walCreateThread() {
static void walStopThread() {
atomic_store_8(&tsWal.stop, 1);
if (taosCheckPthreadValid(tsWal.thread)) {
if (tsWal.thread != NULL && taosCheckPthreadValid(tsWal.thread)) {
pthread_join(tsWal.thread, NULL);
tsWal.thread = NULL;
}
wDebug("wal thread is stopped");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册