提交 8058a0f1 编写于 作者: S Shengliang Guan

xshm

上级 afe99b1f
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "monitor.h"
#include "tcache.h" #include "tcache.h"
#include "tcrc32c.h" #include "tcrc32c.h"
#include "tdatablock.h" #include "tdatablock.h"
...@@ -36,8 +35,7 @@ ...@@ -36,8 +35,7 @@
#include "tworker.h" #include "tworker.h"
#include "dnode.h" #include "dnode.h"
#include "tfs.h" #include "monitor.h"
#include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -53,7 +51,6 @@ extern "C" { ...@@ -53,7 +51,6 @@ extern "C" {
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType; typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus; typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef struct SMgmtFp SMgmtFp; typedef struct SMgmtFp SMgmtFp;
...@@ -127,7 +124,6 @@ typedef struct SDnode { ...@@ -127,7 +124,6 @@ typedef struct SDnode {
bool dropped; bool dropped;
EDndStatus status; EDndStatus status;
EDndEvent event; EDndEvent event;
EProcType procType;
SStartupReq startup; SStartupReq startup;
TdFilePtr pLockFile; TdFilePtr pLockFile;
STransMgmt trans; STransMgmt trans;
...@@ -138,7 +134,7 @@ EDndStatus dndGetStatus(SDnode *pDnode); ...@@ -138,7 +134,7 @@ EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat); void dndSetStatus(SDnode *pDnode, EDndStatus stat);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId); void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
...@@ -146,7 +142,6 @@ int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg) ...@@ -146,7 +142,6 @@ int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg)
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed); int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
......
...@@ -34,7 +34,7 @@ int32_t dndInit(); ...@@ -34,7 +34,7 @@ int32_t dndInit();
void dndCleanup(); void dndCleanup();
const char *dndStatStr(EDndStatus stat); const char *dndStatStr(EDndStatus stat);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
TdFilePtr dndCheckRunning(char *dataDir); TdFilePtr dndCheckRunning(const char *dataDir);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndMsg.c // dndMsg.c
......
...@@ -15,11 +15,12 @@ ...@@ -15,11 +15,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
#include "wal.h"
static int8_t once = DND_ENV_INIT; static int8_t once = DND_ENV_INIT;
int32_t dndInit() { int32_t dndInit() {
dDebug("start to init dnode env"); dInfo("start to init dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT; terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr()); dError("failed to init dnode env since %s", terrstr());
...@@ -52,7 +53,7 @@ int32_t dndInit() { ...@@ -52,7 +53,7 @@ int32_t dndInit() {
} }
void dndCleanup() { void dndCleanup() {
dDebug("start to cleanup dnode env"); dInfo("start to cleanup dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up"); dError("dnode env is already cleaned up");
return; return;
...@@ -92,7 +93,7 @@ const char *dndStatStr(EDndStatus status) { ...@@ -92,7 +93,7 @@ const char *dndStatStr(EDndStatus status) {
} }
} }
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) { void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup; SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
...@@ -104,21 +105,21 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { ...@@ -104,21 +105,21 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
} }
TdFilePtr dndCheckRunning(char *dataDir) { TdFilePtr dndCheckRunning(const char *dataDir) {
char filepath[PATH_MAX] = {0}; char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dataDir); snprintf(filepath, sizeof(filepath), "%s/.running", dataDir);
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s, quit", filepath, terrstr()); dError("failed to lock file:%s since %s", filepath, terrstr());
return NULL; return NULL;
} }
int32_t ret = taosLockFile(pFile); int32_t ret = taosLockFile(pFile);
if (ret != 0) { if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s, quit", filepath, terrstr()); dError("failed to lock file:%s since %s", filepath, terrstr());
taosCloseFile(&pFile); taosCloseFile(&pFile);
return NULL; return NULL;
} }
...@@ -129,12 +130,10 @@ TdFilePtr dndCheckRunning(char *dataDir) { ...@@ -129,12 +130,10 @@ TdFilePtr dndCheckRunning(char *dataDir) {
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received"); dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup); dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)}; SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -58,7 +58,7 @@ static void dndClearMemory(SDnode *pDnode) { ...@@ -58,7 +58,7 @@ static void dndClearMemory(SDnode *pDnode) {
SDnode *dndCreate(const SDnodeOpt *pOption) { SDnode *dndCreate(const SDnodeOpt *pOption) {
dInfo("start to create dnode object"); dInfo("start to create dnode object");
int32_t code = -1; int32_t code = -1;
char path[PATH_MAX]; char path[PATH_MAX] = {0};
SDnode *pDnode = NULL; SDnode *pDnode = NULL;
pDnode = calloc(1, sizeof(SDnode)); pDnode = calloc(1, sizeof(SDnode));
......
...@@ -271,7 +271,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { ...@@ -271,7 +271,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
int32_t vgId = pWrapper->msgVgIds[msgIndex]; int32_t vgId = pWrapper->msgVgIds[msgIndex];
if (msgFp == NULL) continue; if (msgFp == NULL) continue;
dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId); // dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (vgId == QND_VGID) { if (vgId == QND_VGID) {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
#include "wal.h"
static bool mmDeployRequired(SDnode *pDnode) { static bool mmDeployRequired(SDnode *pDnode) {
if (pDnode->dnodeId > 0) return false; if (pDnode->dnodeId > 0) return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册