提交 ae182cd7 编写于 作者: S Shengliang Guan

shm

上级 6ece0d9a
...@@ -22,11 +22,11 @@ extern "C" { ...@@ -22,11 +22,11 @@ extern "C" {
typedef struct { typedef struct {
int32_t id; int32_t id;
int32_t size; int64_t size;
void* ptr; void* ptr;
} SShm; } SShm;
int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ; int32_t taosCreateShm(SShm *pShm, int64_t shmsize) ;
void taosDropShm(SShm *pShm); void taosDropShm(SShm *pShm);
int32_t taosAttachShm(SShm *pShm); int32_t taosAttachShm(SShm *pShm);
void taosDetachShm(SShm *pShm); void taosDetachShm(SShm *pShm);
......
...@@ -128,7 +128,7 @@ typedef struct SDnode { ...@@ -128,7 +128,7 @@ typedef struct SDnode {
EDndStatus status; EDndStatus status;
EDndEvent event; EDndEvent event;
SStartupReq startup; SStartupReq startup;
TdFilePtr runtimeFile; TdFilePtr lockfile;
STransMgmt trans; STransMgmt trans;
SMgmtWrapper wrappers[NODE_MAX]; SMgmtWrapper wrappers[NODE_MAX];
} SDnode; } SDnode;
......
...@@ -54,9 +54,9 @@ int32_t dndInitMsgHandle(SDnode *pDnode); ...@@ -54,9 +54,9 @@ int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
// dndFile.c // dndFile.c
int32_t dndOpenRuntimeFile(SDnode *pDnode); TdFilePtr dndCheckRunning(const char *dataDir);
int32_t dndWriteRuntimeFile(SDnode *pDnode); int32_t dndReadShmFile(SDnode *pDnode);
void dndCloseRuntimeFile(SDnode *pDnode); int32_t dndWriteShmFile(SDnode *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -160,7 +160,12 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -160,7 +160,12 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
return -1; return -1;
} }
SProcCfg cfg = {.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
.parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
...@@ -176,7 +181,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -176,7 +181,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
} }
} }
if (dndWriteRuntimeFile(pDnode) != 0) { if (dndWriteShmFile(pDnode) != 0) {
dError("failed to write runtime file since %s", terrstr()); dError("failed to write runtime file since %s", terrstr());
return -1; return -1;
} }
...@@ -220,6 +225,11 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { ...@@ -220,6 +225,11 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem, .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont, .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont, .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
.parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->shm, .shm = pWrapper->shm,
.pParent = pWrapper, .pParent = pWrapper,
.name = pWrapper->name}; .name = pWrapper->name};
......
...@@ -117,7 +117,30 @@ _OVER: ...@@ -117,7 +117,30 @@ _OVER:
return code; return code;
} }
int32_t dndOpenRuntimeFile(SDnode *pDnode) { TdFilePtr dndCheckRunning(const char *dataDir) {
char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s%s.running", dataDir, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s", filepath, terrstr());
return NULL;
}
int32_t ret = taosLockFile(pFile);
if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s", filepath, terrstr());
taosCloseFile(&pFile);
return NULL;
}
dDebug("file:%s is locked", filepath);
return pFile;
}
int32_t dndReadShmFile(SDnode *pDnode) {
int32_t code = -1; int32_t code = -1;
char itemName[24] = {0}; char itemName[24] = {0};
char content[MAXLEN + 1] = {0}; char content[MAXLEN + 1] = {0};
...@@ -125,17 +148,11 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { ...@@ -125,17 +148,11 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) {
cJSON *root = NULL; cJSON *root = NULL;
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s.running", pDnode->dataDir, TD_DIRSEP); snprintf(file, sizeof(file), "%s%s.shmfile", pDnode->dataDir, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); dDebug("file %s not exist", file);
dError("failed to open file:%s since %s", file, terrstr()); code = 0;
goto _OVER;
}
if (taosLockFile(pFile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s", file, terrstr());
goto _OVER; goto _OVER;
} }
...@@ -150,14 +167,14 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { ...@@ -150,14 +167,14 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) {
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype)); snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype));
cJSON *shmid = cJSON_GetObjectItem(root, itemName); cJSON *shmid = cJSON_GetObjectItem(root, itemName);
if (shmid && shmid->type == cJSON_Number) { if (shmid && shmid->type == cJSON_String) {
pDnode->wrappers[ntype].shm.id = shmid->valueint; pDnode->wrappers[ntype].shm.id = atoi(shmid->valuestring);
} }
snprintf(itemName, sizeof(itemName), "%s_shmsize", dndNodeProcStr(ntype)); snprintf(itemName, sizeof(itemName), "%s_shmsize", dndNodeProcStr(ntype));
cJSON *shmsize = cJSON_GetObjectItem(root, itemName); cJSON *shmsize = cJSON_GetObjectItem(root, itemName);
if (shmsize && shmsize->type == cJSON_Number) { if (shmsize && shmsize->type == cJSON_String) {
pDnode->wrappers[ntype].shm.size = shmsize->valueint; pDnode->wrappers[ntype].shm.size = atoll(shmsize->valuestring);
} }
} }
} }
...@@ -166,7 +183,7 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { ...@@ -166,7 +183,7 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) {
for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) { for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
if (pWrapper->shm.id > 0) { if (pWrapper->shm.id > 0) {
dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size); dDebug("shmid:%d, is closed, size:%" PRId64, pWrapper->shm.id, pWrapper->shm.size);
taosDropShm(&pWrapper->shm); taosDropShm(&pWrapper->shm);
} }
} }
...@@ -177,7 +194,7 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { ...@@ -177,7 +194,7 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) {
dError("shmid:%d, failed to attach since %s", pWrapper->shm.id, terrstr()); dError("shmid:%d, failed to attach since %s", pWrapper->shm.id, terrstr());
goto _OVER; goto _OVER;
} }
dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size); dDebug("shmid:%d, is attached, size:%" PRId64, pWrapper->shm.id, pWrapper->shm.size);
} }
dDebug("successed to open %s", file); dDebug("successed to open %s", file);
...@@ -185,16 +202,12 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { ...@@ -185,16 +202,12 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) {
_OVER: _OVER:
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (code != 0) { if (pFile != NULL) taosCloseFile(&pFile);
if (pFile != NULL) taosCloseFile(&pFile);
} else {
pDnode->runtimeFile = pFile;
}
return code; return code;
} }
int32_t dndWriteRuntimeFile(SDnode *pDnode) { int32_t dndWriteShmFile(SDnode *pDnode) {
int32_t code = -1; int32_t code = -1;
int32_t len = 0; int32_t len = 0;
char content[MAXLEN + 1] = {0}; char content[MAXLEN + 1] = {0};
...@@ -202,8 +215,8 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) { ...@@ -202,8 +215,8 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) {
char realfile[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0};
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s.running.bak", pDnode->dataDir, TD_DIRSEP); snprintf(file, sizeof(file), "%s%s.shmfile.bak", pDnode->dataDir, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%s.running", pDnode->dataDir, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%s.shmfile", pDnode->dataDir, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
...@@ -215,11 +228,13 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) { ...@@ -215,11 +228,13 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) {
len += snprintf(content + len, MAXLEN - len, "{\n"); len += snprintf(content + len, MAXLEN - len, "{\n");
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\": %d,\n", dndNodeProcStr(ntype), pWrapper->shm.id); len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\": \"%d\",\n", dndNodeProcStr(ntype), pWrapper->shm.id);
if (ntype == NODE_MAX - 1) { if (ntype == NODE_MAX - 1) {
len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": %d\n", dndNodeProcStr(ntype), pWrapper->shm.size); len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": \"%" PRId64 "\"\n", dndNodeProcStr(ntype),
pWrapper->shm.size);
} else { } else {
len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": %d,\n", dndNodeProcStr(ntype), pWrapper->shm.size); len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": \"%" PRId64 "\",\n", dndNodeProcStr(ntype),
pWrapper->shm.size);
} }
} }
len += snprintf(content + len, MAXLEN - len, "}\n"); len += snprintf(content + len, MAXLEN - len, "}\n");
...@@ -254,11 +269,3 @@ _OVER: ...@@ -254,11 +269,3 @@ _OVER:
return code; return code;
} }
void dndCloseRuntimeFile(SDnode *pDnode) {
if (pDnode->runtimeFile) {
taosUnLockFile(pDnode->runtimeFile);
taosCloseFile(&pDnode->runtimeFile);
pDnode->runtimeFile = NULL;
}
}
\ No newline at end of file
...@@ -34,6 +34,12 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -34,6 +34,12 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pDnode->lockfile = dndCheckRunning(pDnode->dataDir);
if (pDnode->lockfile == NULL) {
return -1;
}
return 0; return 0;
} }
...@@ -42,7 +48,11 @@ static void dndClearVars(SDnode *pDnode) { ...@@ -42,7 +48,11 @@ static void dndClearVars(SDnode *pDnode) {
SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
taosMemoryFreeClear(pMgmt->path); taosMemoryFreeClear(pMgmt->path);
} }
dndCloseRuntimeFile(pDnode); if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile);
taosCloseFile(&pDnode->lockfile);
pDnode->lockfile = NULL;
}
taosMemoryFreeClear(pDnode->localEp); taosMemoryFreeClear(pDnode->localEp);
taosMemoryFreeClear(pDnode->localFqdn); taosMemoryFreeClear(pDnode->localFqdn);
taosMemoryFreeClear(pDnode->firstEp); taosMemoryFreeClear(pDnode->firstEp);
...@@ -96,8 +106,8 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { ...@@ -96,8 +106,8 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
goto _OVER; goto _OVER;
} }
if (dndOpenRuntimeFile(pDnode) != 0) { if (dndReadShmFile(pDnode) != 0) {
dError("failed to open runtime file since %s", terrstr()); dError("failed to read shm file since %s", terrstr());
goto _OVER; goto _OVER;
} }
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
int32_t taosCreateShm(SShm* pShm, int32_t shmsize) { int32_t taosCreateShm(SShm* pShm, int64_t shmsize) {
int32_t shmid = shmget(IPC_PRIVATE, shmsize, IPC_CREAT | 0600); int32_t shmid = shmget(IPC_PRIVATE, (size_t)shmsize, IPC_CREAT | 0600);
if (shmid < 0) { if (shmid < 0) {
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册