未验证 提交 17a3b4cc 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11244 from taosdata/feature/shm

shm
...@@ -24,11 +24,10 @@ extern "C" { ...@@ -24,11 +24,10 @@ extern "C" {
typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj;
typedef struct SProcObj SProcObj;
typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcMallocFp)(int32_t contLen);
typedef void *(*ProcFreeFp)(void *pCont); typedef void *(*ProcFreeFp)(void *pCont);
typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, typedef void *(*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
ProcFuncType ftype); ProcFuncType ftype);
typedef struct { typedef struct {
...@@ -43,7 +42,7 @@ typedef struct { ...@@ -43,7 +42,7 @@ typedef struct {
ProcMallocFp parentMallocBodyFp; ProcMallocFp parentMallocBodyFp;
ProcFreeFp parentFreeBodyFp; ProcFreeFp parentFreeBodyFp;
SShm shm; SShm shm;
void *pParent; void *parent;
const char *name; const char *name;
bool isChild; bool isChild;
} SProcCfg; } SProcCfg;
...@@ -51,10 +50,13 @@ typedef struct { ...@@ -51,10 +50,13 @@ typedef struct {
SProcObj *taosProcInit(const SProcCfg *pCfg); SProcObj *taosProcInit(const SProcCfg *pCfg);
void taosProcCleanup(SProcObj *pProc); void taosProcCleanup(SProcObj *pProc);
int32_t taosProcRun(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void *handle, ProcFuncType ftype);
ProcFuncType ftype); void taosProcRemoveHandle(SProcObj *pProc, void *handle);
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle));
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -88,6 +88,10 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { ...@@ -88,6 +88,10 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
return 0; return 0;
} }
static void dndProcessProcHandle(void *handle) {
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE};
rpcSendResponse(&rpcMsg);
}
static int32_t dndRunInSingleProcess(SDnode *pDnode) { static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process"); dInfo("dnode run in single process");
...@@ -220,6 +224,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -220,6 +224,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
dndNewProc(pWrapper, n); dndNewProc(pWrapper, n);
} }
} }
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
...@@ -68,7 +68,8 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS ...@@ -68,7 +68,8 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS
code = (*msgFp)(pWrapper, pMsg); code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == PROC_PARENT) { } else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ); code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
PROC_REQ);
} else { } else {
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
ASSERT(1); ASSERT(1);
...@@ -454,6 +455,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t ...@@ -454,6 +455,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
break; break;
case PROC_RELEASE: case PROC_RELEASE:
taosProcRemoveHandle(pWrapper->pProc, pMsg->handle);
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
rpcFreeCont(pCont); rpcFreeCont(pCont);
break; break;
...@@ -461,6 +463,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t ...@@ -461,6 +463,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break; break;
case PROC_RSP: case PROC_RSP:
taosProcRemoveHandle(pWrapper->pProc, pMsg->handle);
dndSendRpcRsp(pWrapper, pMsg); dndSendRpcRsp(pWrapper, pMsg);
break; break;
default: default:
...@@ -481,7 +484,7 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { ...@@ -481,7 +484,7 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->shm, .shm = pWrapper->shm,
.pParent = pWrapper, .parent = pWrapper,
.name = pWrapper->name}; .name = pWrapper->name};
return cfg; return cfg;
} }
\ No newline at end of file
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tprocess.h" #include "tprocess.h"
#include "taos.h"
#include "taoserror.h" #include "taoserror.h"
#include "thash.h"
#include "tlog.h" #include "tlog.h"
#include "tqueue.h" #include "tqueue.h"
...@@ -48,8 +50,9 @@ typedef struct SProcObj { ...@@ -48,8 +50,9 @@ typedef struct SProcObj {
ProcFreeFp parentFreeHeadFp; ProcFreeFp parentFreeHeadFp;
ProcMallocFp parentMallocBodyFp; ProcMallocFp parentMallocBodyFp;
ProcFreeFp parentFreeBodyFp; ProcFreeFp parentFreeBodyFp;
void *pParent; void *parent;
const char *name; const char *name;
SHashObj *hash;
int32_t pid; int32_t pid;
bool isChild; bool isChild;
bool stopFlag; bool stopFlag;
...@@ -151,8 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { ...@@ -151,8 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
#endif #endif
} }
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
int32_t rawBodyLen, ProcFuncType ftype) { const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) {
const int32_t headLen = CEIL8(rawHeadLen); const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
...@@ -164,6 +167,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t ...@@ -164,6 +167,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
return -1; return -1;
} }
if (handle != 0 && ftype == PROC_REQ) {
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handle, sizeof(int64_t)) != 0) {
taosThreadMutexUnlock(&pQueue->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
const int32_t pos = pQueue->tail; const int32_t pos = pQueue->tail;
if (pQueue->tail < pQueue->total) { if (pQueue->tail < pQueue->total) {
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen; *(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen;
...@@ -317,6 +328,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -317,6 +328,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->name = pCfg->name; pProc->name = pCfg->name;
pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize); pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize);
pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize); pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize);
pProc->hash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcCleanupQueue(pProc->pChildQueue); taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree(pProc); taosMemoryFree(pProc);
...@@ -324,7 +336,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -324,7 +336,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
} }
pProc->name = pCfg->name; pProc->name = pCfg->name;
pProc->pParent = pCfg->pParent; pProc->parent = pCfg->parent;
pProc->childMallocHeadFp = pCfg->childMallocHeadFp; pProc->childMallocHeadFp = pCfg->childMallocHeadFp;
pProc->childFreeHeadFp = pCfg->childFreeHeadFp; pProc->childFreeHeadFp = pCfg->childFreeHeadFp;
pProc->childMallocBodyFp = pCfg->childMallocBodyFp; pProc->childMallocBodyFp = pCfg->childMallocBodyFp;
...@@ -384,7 +396,7 @@ static void taosProcThreadLoop(SProcObj *pProc) { ...@@ -384,7 +396,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
taosMsleep(1); taosMsleep(1);
continue; continue;
} else { } else {
(*consumeFp)(pProc->pParent, pHead, headLen, pBody, bodyLen, ftype); (*consumeFp)(pProc->parent, pHead, headLen, pBody, bodyLen, ftype);
} }
} }
} }
...@@ -424,19 +436,41 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -424,19 +436,41 @@ void taosProcCleanup(SProcObj *pProc) {
taosProcStop(pProc); taosProcStop(pProc);
taosProcCleanupQueue(pProc->pChildQueue); taosProcCleanupQueue(pProc->pChildQueue);
taosProcCleanupQueue(pProc->pParentQueue); taosProcCleanupQueue(pProc->pParentQueue);
if (pProc->hash != NULL) {
taosHashCleanup(pProc->hash);
pProc->hash = NULL;
}
uDebug("proc:%s, is cleaned up", pProc->name); uDebug("proc:%s, is cleaned up", pProc->name);
taosMemoryFree(pProc); taosMemoryFree(pProc);
} }
} }
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) { void *handle, ProcFuncType ftype) {
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype); return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype);
}
void taosProcRemoveHandle(SProcObj *pProc, void *handle) {
int64_t h = (int64_t)handle;
taosThreadMutexLock(&pProc->pChildQueue->mutex);
taosHashRemove(pProc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
}
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
taosThreadMutexLock(&pProc->pChildQueue->mutex);
void *h = taosHashIterate(pProc->hash, NULL);
while (h != NULL) {
void *handle = *((void **)h);
(*HandleFp)(handle);
}
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
} }
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) { ProcFuncType ftype) {
while (taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype) != 0) { while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
taosMsleep(1); taosMsleep(1);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册