未验证 提交 a35e5fad 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #10998 from taosdata/feature/shm

adjust logs
...@@ -51,6 +51,7 @@ void taosProcCleanup(SProcObj *pProc); ...@@ -51,6 +51,7 @@ void taosProcCleanup(SProcObj *pProc);
int32_t taosProcRun(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc); void taosProcStop(SProcObj *pProc);
bool taosProcIsChild(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc);
int32_t taosProcChildId(SProcObj *pProc);
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
......
...@@ -56,7 +56,6 @@ void dndCleanupServer(SDnode *pDnode); ...@@ -56,7 +56,6 @@ void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,7 +20,7 @@ static void dndResetLog(SMgmtWrapper *pMgmt) { ...@@ -20,7 +20,7 @@ static void dndResetLog(SMgmtWrapper *pMgmt) {
char logname[24] = {0}; char logname[24] = {0};
snprintf(logname, sizeof(logname), "%slog", pMgmt->name); snprintf(logname, sizeof(logname), "%slog", pMgmt->name);
dInfo("node:%s, reset log to %s", pMgmt->name, logname); dInfo("node:%s, reset log to %s in child process", pMgmt->name, logname);
taosCloseLog(); taosCloseLog();
taosInitLog(logname, 1); taosInitLog(logname, 1);
} }
...@@ -51,6 +51,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) { ...@@ -51,6 +51,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
void dndCloseNode(SMgmtWrapper *pWrapper) { void dndCloseNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, start to close", pWrapper->name); dDebug("node:%s, start to close", pWrapper->name);
pWrapper->required = false;
taosWLockLatch(&pWrapper->latch); taosWLockLatch(&pWrapper->latch);
if (pWrapper->deployed) { if (pWrapper->deployed) {
(*pWrapper->fp.closeFp)(pWrapper); (*pWrapper->fp.closeFp)(pWrapper);
...@@ -138,7 +139,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t ...@@ -138,7 +139,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) { static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from parent queue", pRsp); dTrace("msg:%p, get from parent queue", pRsp);
pRsp->pCont = pCont; pRsp->pCont = pCont;
dndSendRpcRsp(pWrapper, pRsp); dndSendRsp(pWrapper, pRsp);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
} }
...@@ -178,7 +179,6 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -178,7 +179,6 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.testFlag = 0,
.pParent = pWrapper, .pParent = pWrapper,
.name = pWrapper->name}; .name = pWrapper->name};
SProcObj *pProc = taosProcInit(&cfg); SProcObj *pProc = taosProcInit(&cfg);
...@@ -200,7 +200,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -200,7 +200,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dInfo("node:%s, will be initialized in child process", pWrapper->name); dInfo("node:%s, will be initialized in child process", pWrapper->name);
dndOpenNode(pWrapper); dndOpenNode(pWrapper);
} else { } else {
dInfo("node:%s, will not start in parent process", pWrapper->name); dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc));
pWrapper->procType = PROC_PARENT; pWrapper->procType = PROC_PARENT;
} }
...@@ -210,16 +210,20 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -210,16 +210,20 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
} }
} }
#if 0 dndSetStatus(pDnode, DND_STAT_RUNNING);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
dndReleaseWrapper(pWrapper); SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dError("failed to start dnode worker since %s", terrstr()); if (!pWrapper->required) continue;
return -1; if (pWrapper->fp.startFp == NULL) continue;
if (pWrapper->procType == PROC_PARENT && n != DNODE) continue;
if (pWrapper->procType == PROC_CHILD && n == DNODE) continue;
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
} }
dndReleaseWrapper(pWrapper);
#endif
return 0; return 0;
} }
......
...@@ -16,14 +16,16 @@ ...@@ -16,14 +16,16 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
#define MAXLEN 1024
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR; int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 1024; const int32_t maxLen = MAXLEN;
char *content = taosMemoryCalloc(1, maxLen + 1); char content[MAXLEN + 1] = {0};
cJSON *root = NULL; cJSON *root = NULL;
char file[PATH_MAX]; char file[PATH_MAX];
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
pFile = taosOpenFile(file, TD_FILE_READ); pFile = taosOpenFile(file, TD_FILE_READ);
...@@ -57,7 +59,6 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { ...@@ -57,7 +59,6 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
_OVER: _OVER:
if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
...@@ -66,7 +67,7 @@ _OVER: ...@@ -66,7 +67,7 @@ _OVER:
} }
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) { int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
char file[PATH_MAX]; char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
...@@ -76,9 +77,9 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) { ...@@ -76,9 +77,9 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
return -1; return -1;
} }
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 1024; const int32_t maxLen = MAXLEN;
char *content = taosMemoryCalloc(1, maxLen + 1); char content[MAXLEN + 1] = {0};
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed); len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed);
...@@ -87,9 +88,8 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) { ...@@ -87,9 +88,8 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
taosWriteFile(pFile, content, len); taosWriteFile(pFile, content, len);
taosFsyncFile(pFile); taosFsyncFile(pFile);
taosCloseFile(&pFile); taosCloseFile(&pFile);
taosMemoryFree(content);
char realfile[PATH_MAX]; char realfile[PATH_MAX] = {0};
snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
if (taosRenameFile(file, realfile) != 0) { if (taosRenameFile(file, realfile) != 0) {
......
...@@ -43,36 +43,40 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) { ...@@ -43,36 +43,40 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg)); memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
return 0; return 0;
} }
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
}
int32_t code = -1; int32_t code = -1;
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
NodeMsgFp msgFp = NULL; NodeMsgFp msgFp = NULL;
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
}
if (dndMarkWrapper(pWrapper) != 0) goto _OVER; if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER; if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER; if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER; if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
if (pWrapper->procType == PROC_SINGLE) { if (pWrapper->procType == PROC_SINGLE) {
dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
code = (*msgFp)(pWrapper->pMgmt, pMsg); code = (*msgFp)(pWrapper->pMgmt, pMsg);
} else if (pWrapper->procType == PROC_PARENT) { } else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
pRpc->ahandle, pMsg->user);
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
} else { } else {
dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
pMsg->user);
ASSERT(1);
} }
_OVER: _OVER:
if (code == 0) { if (code == 0) {
if (pWrapper->procType == PROC_PARENT) { if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed in parent process", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
......
...@@ -175,7 +175,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) { ...@@ -175,7 +175,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0; int32_t code = 0;
taosRLockLatch(&pWrapper->latch); taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) { if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else { } else {
......
...@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { ...@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
} }
} }
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_APP_NOT_READY) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
if (pDnodeWrapper != NULL) { if (pDnodeWrapper != NULL) {
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "tbase64.h" #include "tbase64.h"
#define TSDB_USER_VER_NUMBER 1 #define TSDB_USER_VER_NUMBER 1
#define TSDB_USER_RESERVE_SIZE 64 #define TSDB_USER_RESERVE_SIZE 64
static int32_t mndCreateDefaultUsers(SMnode *pMnode); static int32_t mndCreateDefaultUsers(SMnode *pMnode);
...@@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate ...@@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
userObj.updateTime = userObj.createdTime; userObj.updateTime = userObj.createdTime;
userObj.superUser = pCreate->superUser; userObj.superUser = pCreate->superUser;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_CREATE_USER, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("user:%s, failed to create since %s", pCreate->user, terrstr()); mError("user:%s, failed to create since %s", pCreate->user, terrstr());
return -1; return -1;
...@@ -350,7 +350,7 @@ CREATE_USER_OVER: ...@@ -350,7 +350,7 @@ CREATE_USER_OVER:
} }
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SNodeMsg *pReq) { static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SNodeMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER,&pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("user:%s, failed to update since %s", pOld->user, terrstr()); mError("user:%s, failed to update since %s", pOld->user, terrstr());
return -1; return -1;
...@@ -511,7 +511,7 @@ ALTER_USER_OVER: ...@@ -511,7 +511,7 @@ ALTER_USER_OVER:
} }
static int32_t mndDropUser(SMnode *pMnode, SNodeMsg *pReq, SUserObj *pUser) { static int32_t mndDropUser(SMnode *pMnode, SNodeMsg *pReq, SUserObj *pUser) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_USER, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_USER, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("user:%s, failed to drop since %s", pUser->user, terrstr()); mError("user:%s, failed to drop since %s", pUser->user, terrstr());
return -1; return -1;
......
...@@ -56,7 +56,6 @@ typedef struct SProcObj { ...@@ -56,7 +56,6 @@ typedef struct SProcObj {
int32_t pid; int32_t pid;
bool isChild; bool isChild;
bool stopFlag; bool stopFlag;
bool testFlag;
} SProcObj; } SProcObj;
static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
...@@ -77,7 +76,7 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { ...@@ -77,7 +76,7 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
goto _OVER; goto _OVER;
} }
shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), 0600); shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), IPC_CREAT | 0600);
if (shmid <= 0) { if (shmid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while shmget since %s", terrstr()); uError("failed to init mutex while shmget since %s", terrstr());
...@@ -101,8 +100,13 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { ...@@ -101,8 +100,13 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
taosThreadMutexDestroy(pMutex); if (pMutex != NULL) {
shmctl(shmid, IPC_RMID, NULL); taosThreadMutexDestroy(pMutex);
shmdt(pMutex);
}
if (shmid >= 0) {
shmctl(shmid, IPC_RMID, NULL);
}
} else { } else {
*ppMutex = pMutex; *ppMutex = pMutex;
*pShmid = shmid; *pShmid = shmid;
...@@ -112,12 +116,12 @@ _OVER: ...@@ -112,12 +116,12 @@ _OVER:
return code; return code;
} }
static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t *pShmid) { static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t shmid) {
if (pMutex != NULL) { if (pMutex != NULL) {
taosThreadMutexDestroy(pMutex); taosThreadMutexDestroy(pMutex);
} }
if (*pShmid > 0) { if (shmid >= 0) {
shmctl(*pShmid, IPC_RMID, NULL); shmctl(shmid, IPC_RMID, NULL);
} }
} }
...@@ -141,13 +145,14 @@ static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) { ...@@ -141,13 +145,14 @@ static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) {
return shmid; return shmid;
} }
static void taosProcDestroyBuffer(void *pBuffer, int32_t *pShmid) { static void taosProcDestroyBuffer(void *pBuffer, int32_t shmid) {
if (*pShmid > 0) { if (shmid > 0) {
shmctl(*pShmid, IPC_RMID, NULL); shmdt(pBuffer);
shmctl(shmid, IPC_RMID, NULL);
} }
} }
static SProcQueue *taosProcQueueInit(int32_t size) { static SProcQueue *taosProcInitQueue(int32_t size) {
if (size <= 0) size = SHM_DEFAULT_SIZE; if (size <= 0) size = SHM_DEFAULT_SIZE;
int32_t bufSize = CEIL8(size); int32_t bufSize = CEIL8(size);
...@@ -155,29 +160,28 @@ static SProcQueue *taosProcQueueInit(int32_t size) { ...@@ -155,29 +160,28 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
SProcQueue *pQueue = NULL; SProcQueue *pQueue = NULL;
int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize); int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize);
if (shmId <= 0) { if (shmId < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pQueue->bufferShmid = shmId; pQueue->bufferShmid = shmId;
if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
taosMemoryFree(pQueue); taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
return NULL; return NULL;
} }
if (tsem_init(&pQueue->sem, 1, 0) != 0) { if (tsem_init(&pQueue->sem, 1, 0) != 0) {
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid); taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
taosMemoryFree(pQueue); taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
tsem_destroy(&pQueue->sem); tsem_destroy(&pQueue->sem);
taosMemoryFree(pQueue); taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
return NULL; return NULL;
} }
...@@ -190,12 +194,12 @@ static SProcQueue *taosProcQueueInit(int32_t size) { ...@@ -190,12 +194,12 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
return pQueue; return pQueue;
} }
static void taosProcQueueCleanup(SProcQueue *pQueue) { static void taosProcCleanupQueue(SProcQueue *pQueue) {
if (pQueue != NULL) { if (pQueue != NULL) {
uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue); uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue);
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
tsem_destroy(&pQueue->sem); tsem_destroy(&pQueue->sem);
taosMemoryFree(pQueue); taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
} }
} }
...@@ -204,6 +208,11 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea ...@@ -204,6 +208,11 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
if (headLen <= 0 || bodyLen <= 0) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
taosThreadMutexLock(pQueue->mutex); taosThreadMutexLock(pQueue->mutex);
if (fullLen > pQueue->avail) { if (fullLen > pQueue->avail) {
taosThreadMutexUnlock(pQueue->mutex); taosThreadMutexUnlock(pQueue->mutex);
...@@ -255,7 +264,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea ...@@ -255,7 +264,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
taosThreadMutexUnlock(pQueue->mutex); taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, rawHeadLen, pBody, rawBodyLen, pQueue); uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue);
return 0; return 0;
} }
...@@ -344,12 +353,10 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -344,12 +353,10 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
} }
pProc->name = pCfg->name; pProc->name = pCfg->name;
pProc->testFlag = pCfg->testFlag; pProc->pChildQueue = taosProcInitQueue(pCfg->childQueueSize);
pProc->pParentQueue = taosProcInitQueue(pCfg->parentQueueSize);
pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcQueueCleanup(pProc->pChildQueue); taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree(pProc); taosMemoryFree(pProc);
return NULL; return NULL;
} }
...@@ -369,17 +376,15 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -369,17 +376,15 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp; pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp;
pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp; pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp;
uDebug("proc:%s, initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue); uDebug("proc:%s, is initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue);
if (!pProc->testFlag) { pProc->pid = fork();
pProc->pid = fork(); if (pProc->pid == 0) {
if (pProc->pid == 0) { pProc->isChild = 1;
pProc->isChild = 1; prctl(PR_SET_NAME, pProc->name, NULL, NULL, NULL);
uInfo("this is child process, pid:%d", pProc->pid); } else {
} else { pProc->isChild = 0;
pProc->isChild = 0; uInfo("this is parent process, child pid:%d", pProc->pid);
uInfo("this is parent process, pid:%d", pProc->pid);
}
} }
return pProc; return pProc;
...@@ -398,7 +403,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { ...@@ -398,7 +403,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
if (code < 0) { if (code < 0) {
uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue); uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue);
break; break;
} else if (code < 0) { } else if (code == 0) {
uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr()); uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr());
taosMsleep(1); taosMsleep(1);
continue; continue;
...@@ -413,16 +418,14 @@ int32_t taosProcRun(SProcObj *pProc) { ...@@ -413,16 +418,14 @@ int32_t taosProcRun(SProcObj *pProc) {
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pProc->isChild || pProc->testFlag) { if (pProc->isChild) {
if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) { if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr()); uError("failed to create thread since %s", terrstr());
return -1; return -1;
} }
uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue); uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue);
} } else {
if (!pProc->isChild || pProc->testFlag) {
if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) { if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr()); uError("failed to create thread since %s", terrstr());
...@@ -441,12 +444,14 @@ void taosProcStop(SProcObj *pProc) { ...@@ -441,12 +444,14 @@ void taosProcStop(SProcObj *pProc) {
bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; } bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; }
int32_t taosProcChildId(SProcObj *pProc) { return pProc->pid; }
void taosProcCleanup(SProcObj *pProc) { void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) { if (pProc != NULL) {
uDebug("proc:%s, clean up", pProc->name); uDebug("proc:%s, clean up", pProc->name);
taosProcStop(pProc); taosProcStop(pProc);
taosProcQueueCleanup(pProc->pChildQueue); taosProcCleanupQueue(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue); taosProcCleanupQueue(pProc->pParentQueue);
taosMemoryFree(pProc); taosMemoryFree(pProc);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册