未验证 提交 1d437eaa 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11266 from taosdata/feature/shm

shm
......@@ -51,7 +51,14 @@ extern int32_t tsCompatibleModel;
extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth;
extern int64_t tsTickPerDay[3];
// multi-process
extern bool tsMultiProcess;
extern int32_t tsMnodeShmSize;
extern int32_t tsVnodeShmSize;
extern int32_t tsQnodeShmSize;
extern int32_t tsSnodeShmSize;
extern int32_t tsBnodeShmSize;
// monitor
extern bool tsEnableMonitor;
......
......@@ -22,12 +22,12 @@
extern "C" {
#endif
typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
typedef enum { PROC_REQ = 1, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
typedef struct SProcObj SProcObj;
typedef void *(*ProcMallocFp)(int32_t contLen);
typedef void *(*ProcFreeFp)(void *pCont);
typedef void *(*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
ProcFuncType ftype);
typedef struct {
......@@ -50,6 +50,7 @@ typedef struct {
SProcObj *taosProcInit(const SProcCfg *pCfg);
void taosProcCleanup(SProcObj *pProc);
int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, ProcFuncType ftype);
......
......@@ -45,7 +45,14 @@ float tsRatioOfQueryCores = 1.0f;
int32_t tsMaxBinaryDisplayWidth = 30;
bool tsEnableSlaveQuery = 1;
bool tsPrintAuth = 0;
bool tsMultiProcess = 0;
// multi process
bool tsMultiProcess = false;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
// monitor
bool tsEnableMonitor = 1;
......@@ -347,7 +354,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
// if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
......@@ -466,7 +479,13 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval;
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32;
tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32;
tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32;
tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32;
// tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
......
......@@ -118,7 +118,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dndReleaseWrapper(pWrapper);
......@@ -146,7 +146,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg
return code;
}
static int32_t dmProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
static int32_t dmProcessDropNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
......
......@@ -18,15 +18,15 @@
#include "tconfig.h"
static struct {
bool dumpConfig;
bool generateGrant;
bool printAuth;
bool printVersion;
char envFile[PATH_MAX];
char apolloUrl[PATH_MAX];
SArray *pArgs; // SConfigPair
SDnode *pDnode;
ENodeType ntype;
bool dumpConfig;
bool generateGrant;
bool printAuth;
bool printVersion;
char envFile[PATH_MAX];
char apolloUrl[PATH_MAX];
SArray *pArgs; // SConfigPair
SDnode *pDnode;
EDndType ntype;
} global = {0};
static void dndStopDnode(int signum, void *info, void *ctx) {
......
......@@ -49,7 +49,7 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } EDndType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
......@@ -92,7 +92,7 @@ typedef struct SMgmtWrapper {
char *path;
int32_t refCount;
SRWLatch latch;
ENodeType ntype;
EDndType ntype;
bool deployed;
bool required;
EProcType procType;
......@@ -126,7 +126,7 @@ typedef struct SDnode {
int32_t numOfDisks;
uint16_t serverPort;
bool dropped;
ENodeType ntype;
EDndType ntype;
EDndStatus status;
EDndEvent event;
SStartupReq startup;
......@@ -137,8 +137,8 @@ typedef struct SDnode {
// dndEnv.c
const char *dndStatStr(EDndStatus stat);
const char *dndNodeLogStr(ENodeType ntype);
const char *dndNodeProcStr(ENodeType ntype);
const char *dndNodeLogStr(EDndType ntype);
const char *dndNodeProcStr(EDndType ntype);
const char *dndEventStr(EDndEvent ev);
// dndExec.c
......@@ -156,7 +156,7 @@ int32_t dndWriteShmFile(SDnode *pDnode);
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType nType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
......
......@@ -71,7 +71,7 @@ const char *dndStatStr(EDndStatus status) {
}
}
const char *dndNodeLogStr(ENodeType ntype) {
const char *dndNodeLogStr(EDndType ntype) {
switch (ntype) {
case VNODES:
return "vnode";
......@@ -88,7 +88,7 @@ const char *dndNodeLogStr(ENodeType ntype) {
}
}
const char *dndNodeProcStr(ENodeType ntype) {
const char *dndNodeProcStr(EDndType ntype) {
switch (ntype) {
case VNODES:
return "taosv";
......
......@@ -66,7 +66,7 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
}
static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
char tstr[8] = {0};
char *args[6] = {0};
snprintf(tstr, sizeof(tstr), "%d", n);
......@@ -89,6 +89,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
}
static void dndProcessProcHandle(void *handle) {
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE};
rpcSendResponse(&rpcMsg);
}
......@@ -96,7 +97,7 @@ static void dndProcessProcHandle(void *handle) {
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process");
for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
for (EDndType n = DNODE; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
......@@ -109,7 +110,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dndSetStatus(pDnode, DND_STAT_RUNNING);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->fp.startFp == NULL) continue;
......@@ -141,12 +142,25 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
return -1;
}
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
int32_t shmsize = 1024 * 1024 * 2; // size will be a configuration item
int32_t shmsize = tsMnodeShmSize;
if (n == VNODES) {
shmsize = tsVnodeShmSize;
} else if (n == QNODE) {
shmsize = tsQnodeShmSize;
} else if (n == SNODE) {
shmsize = tsSnodeShmSize;
} else if (n == MNODE) {
shmsize = tsMnodeShmSize;
} else if (n == BNODE) {
shmsize = tsBnodeShmSize;
} else {
}
if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
terrno = TAOS_SYSTEM_ERROR(terrno);
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
......@@ -169,7 +183,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
return -1;
}
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
......@@ -202,7 +216,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
dInfo("dnode is about to stop");
dndSetStatus(pDnode, DND_STAT_STOPPED);
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_MAX) continue;
......@@ -217,13 +231,13 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
}
break;
} else {
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_MAX) continue;
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
dndNewProc(pWrapper, n);
}
......
......@@ -164,7 +164,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
goto _OVER;
}
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype));
cJSON *shmid = cJSON_GetObjectItem(root, itemName);
if (shmid && shmid->type == cJSON_Number) {
......@@ -180,7 +180,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
}
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
for (EDndType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (pWrapper->shm.id >= 0) {
dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size);
......@@ -226,7 +226,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
}
len += snprintf(content + len, MAXLEN - len, "{\n");
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id);
if (ntype == NODE_MAX - 1) {
......
......@@ -46,7 +46,7 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
}
static void dndClearVars(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) {
for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
taosMemoryFreeClear(pMgmt->path);
}
......@@ -89,7 +89,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
smSetMgmtFp(&pDnode->wrappers[SNODE]);
bmSetMgmtFp(&pDnode->wrappers[BNODE]);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path);
......@@ -106,7 +106,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
}
if (dndInitMsgHandle(pDnode) != 0) {
dError("failed to msg handles since %s", terrstr());
dError("failed to init msg handles since %s", terrstr());
goto _OVER;
}
......@@ -134,7 +134,7 @@ _OVER:
void dndClose(SDnode *pDnode) {
if (pDnode == NULL) return;
for (ENodeType n = 0; n < NODE_MAX; ++n) {
for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper);
}
......@@ -149,7 +149,7 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
}
}
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) {
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper;
......
......@@ -307,7 +307,7 @@ void dndCleanupTrans(SDnode *pDnode) {
int32_t dndInitMsgHandle(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->trans;
for (ENodeType n = 0; n < NODE_MAX; ++n) {
for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
......
......@@ -40,9 +40,7 @@ class Testbase {
void ServerStart();
void ClientRestart();
SRpcMsg* SendReq(tmsg_t msgType, void* pCont, int32_t contLen);
private:
void InitLog(const char* path);
void InitLog(const char* path);
private:
TestServer server;
......
......@@ -20,7 +20,13 @@
int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) {
pShm->id = -1;
int32_t shmid = shmget(0X95270000 + key, shmsize, IPC_CREAT | 0600);
// key_t shkey = IPC_PRIVATE;
// int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600;
key_t __shkey = 0X95270000 + key;
int32_t __shmflag = IPC_CREAT | 0600;
int32_t shmid = shmget(__shkey, shmsize, __shmflag);
if (shmid < 0) {
return -1;
}
......
......@@ -348,6 +348,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
}
}
uError("name:%s, cfg not found", name);
terrno = TSDB_CODE_CFG_NOT_FOUND;
return NULL;
}
......
......@@ -68,7 +68,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
......@@ -96,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string")
TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible")
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
......
......@@ -21,7 +21,6 @@
#include "tlog.h"
#include "tqueue.h"
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
typedef void *(*ProcThreadFp)(void *param);
typedef struct SProcQueue {
......@@ -80,7 +79,7 @@ static int32_t taosProcInitMutex(SProcQueue *pQueue) {
}
if (taosThreadMutexInit(&pQueue->mutex, &mattr) != 0) {
taosThreadMutexDestroy(&pQueue->mutex);
taosThreadMutexAttrDestroy(&mattr);
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex since %s", terrstr());
return -1;
......@@ -156,6 +155,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
......@@ -177,13 +181,13 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
const int32_t pos = pQueue->tail;
if (pQueue->tail < pQueue->total) {
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen;
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = rawHeadLen;
*(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
*(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen;
*(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = rawBodyLen;
} else {
*(int16_t *)(pQueue->pBuffer) = headLen;
*(int16_t *)(pQueue->pBuffer) = rawHeadLen;
*(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
*(int32_t *)(pQueue->pBuffer + 4) = rawBodyLen;
}
if (pQueue->tail < pQueue->head) {
......@@ -239,18 +243,20 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
return 0;
}
int16_t headLen = 0;
int16_t rawHeadLen = 0;
int8_t ftype = 0;
int32_t bodyLen = 0;
int32_t rawBodyLen = 0;
if (pQueue->head < pQueue->total) {
headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
rawHeadLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
rawBodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
} else {
headLen = *(int16_t *)(pQueue->pBuffer);
rawHeadLen = *(int16_t *)(pQueue->pBuffer);
ftype = *(int8_t *)(pQueue->pBuffer + 2);
bodyLen = *(int32_t *)(pQueue->pBuffer + 4);
rawBodyLen = *(int32_t *)(pQueue->pBuffer + 4);
}
int16_t headLen = CEIL8(rawHeadLen);
int32_t bodyLen = CEIL8(rawBodyLen);
void *pHead = (*mallocHeadFp)(headLen);
void *pBody = (*mallocBodyFp)(bodyLen);
......@@ -301,12 +307,12 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
*ppHead = pHead;
*ppBody = pBody;
*pHeadLen = headLen;
*pBodyLen = bodyLen;
*pHeadLen = rawHeadLen;
*pBodyLen = rawBodyLen;
*pFuncType = (ProcFuncType)ftype;
uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
pQueue->items, headLen, pHead, bodyLen, pBody);
pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody);
return 1;
}
......@@ -383,7 +389,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
freeBodyFp = pProc->parentFreeBodyFp;
}
uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue);
uDebug("proc:%s, start to get msg from queue:%p, thread:%" PRId64, pProc->name, pQueue, pProc->thread);
while (1) {
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
......@@ -392,7 +398,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue);
break;
} else if (numOfMsgs < 0) {
uTrace("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr());
uError("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr());
taosMsleep(1);
continue;
} else {
......@@ -412,11 +418,11 @@ int32_t taosProcRun(SProcObj *pProc) {
return -1;
}
uDebug("proc:%s, start to consume queue:%p, thread:%" PRId64, pProc->name, pProc->pChildQueue, pProc->thread);
uDebug("proc:%s, start to consume, thread:%" PRId64, pProc->name, pProc->thread);
return 0;
}
static void taosProcStop(SProcObj *pProc) {
void taosProcStop(SProcObj *pProc) {
if (!taosCheckPthreadValid(pProc->thread)) return;
uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
......@@ -428,6 +434,7 @@ static void taosProcStop(SProcObj *pProc) {
}
tsem_post(&pQueue->sem);
taosThreadJoin(pProc->thread, NULL);
pProc->thread = 0;
}
void taosProcCleanup(SProcObj *pProc) {
......@@ -448,6 +455,10 @@ void taosProcCleanup(SProcObj *pProc) {
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, ProcFuncType ftype) {
if (ftype != PROC_REQ) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype);
}
......@@ -464,13 +475,18 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
while (h != NULL) {
void *handle = *((void **)h);
(*HandleFp)(handle);
h = taosHashIterate(pProc->hash, h);
}
taosHashClear(pProc->hash);
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
}
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) {
int32_t retry = 0;
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
taosMsleep(1);
uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
retry++;
taosMsleep(retry);
}
}
......@@ -46,11 +46,11 @@ add_executable(encodeTest "encodeTest.cpp")
target_link_libraries(encodeTest os util gtest gtest_main)
# queueTest
add_executable(queue_test "queueTest.cpp")
target_link_libraries(queue_test os util gtest_main)
add_executable(procTest "procTest.cpp")
target_link_libraries(procTest os util transport sut gtest_main)
add_test(
NAME queue_test
COMMAND queue_test
NAME procTest
COMMAND procTest
)
# cfgTest
......
/**
* @file queue.cpp
* @author slguan (slguan@taosdata.com)
* @brief UTIL module queue tests
* @version 1.0
* @date 2022-01-27
*
* @copyright Copyright (c) 2022
*
*/
#include <gtest/gtest.h>
#include "tlog.h"
#include "tprocess.h"
#include "tqueue.h"
typedef struct STestMsg {
uint16_t msgType;
void *pCont;
int contLen;
int32_t code;
void *handle; // rpc handle returned to app
void *ahandle; // app handle set by client
int noResp; // has response or not(default 0, 0: resp, 1: no resp);
int persistHandle; // persist handle or not
} STestMsg;
class UtilTesProc : public ::testing::Test {
public:
void SetUp() override {
shm.id = -1;
for (int32_t i = 0; i < 4000; ++i) {
body[i] = i % 26 + 'a';
}
head.pCont = body;
head.code = 1;
head.msgType = 2;
head.noResp = 3;
head.persistHandle = 4;
taosRemoveDir("/tmp/td");
taosMkDir("/tmp/td");
tstrncpy(tsLogDir, "/tmp/td", PATH_MAX);
if (taosInitLog("taosdlog", 1) != 0) {
printf("failed to init log file\n");
}
}
void TearDown() override { taosDropShm(&shm); }
public:
static STestMsg head;
static char body[4000];
static SShm shm;
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
};
SShm UtilTesProc::shm;
char UtilTesProc::body[4000];
STestMsg UtilTesProc::head;
TEST_F(UtilTesProc, 00_Init_Cleanup) {
ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0);
shm.size = 1023;
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.childFreeBodyFp = (ProcFreeFp)taosMemoryMalloc,
.parentConsumeFp = (ProcConsumeFp)NULL,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeBodyFp = (ProcFreeFp)taosMemoryMalloc,
.shm = shm,
.parent = &shm,
.name = "1234"};
SProcObj *proc = taosProcInit(&cfg);
ASSERT_EQ(proc, nullptr);
shm.size = 2468;
cfg.shm = shm;
proc = taosProcInit(&cfg);
ASSERT_NE(proc, nullptr);
ASSERT_EQ(taosProcRun(proc), 0);
taosProcCleanup(proc);
taosDropShm(&shm);
}
void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
memcpy(body, pBody, bodyLen);
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent,
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
taosMemoryFree(pBody);
taosFreeQitem(pHead);
}
TEST_F(UtilTesProc, 01_Push_Pop_Child) {
shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1235, shm.size), 0);
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild1,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.parentConsumeFp = (ProcConsumeFp)NULL,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.shm = shm,
.parent = (void *)((int64_t)1235),
.name = "1235_c"};
SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RSP), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REGIST), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RELEASE), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_REQ), 0);
for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0);
}
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0);
cfg.isChild = true;
cfg.name = "1235_p";
SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr);
taosProcRun(pproc);
taosProcCleanup(pproc);
}
taosProcCleanup(cproc);
taosDropShm(&shm);
}
void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
memcpy(body, pBody, bodyLen);
uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent,
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
taosMemoryFree(pBody);
taosMemoryFree(pHead);
}
TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1236, shm.size), 0);
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.parentConsumeFp = (ProcConsumeFp)ConsumeParent1,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.shm = shm,
.parent = (void *)((int64_t)1236),
.name = "1236_c"};
SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr);
cfg.name = "1236_p";
cfg.isChild = true;
SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr);
for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_REQ);
}
taosProcRun(cproc);
taosProcStop(cproc);
}
taosProcCleanup(pproc);
taosProcCleanup(cproc);
taosDropShm(&shm);
}
void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
memcpy(body, pBody, bodyLen);
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent,
ftype, headLen, bodyLen, (int64_t)msg.handle, body);
taosMemoryFree(pBody);
taosFreeQitem(pHead);
}
void processHandle(void *handle) { uDebug("----> remove handle:%" PRId64 " <----", (int64_t)handle); }
TEST_F(UtilTesProc, 03_Handle) {
// uDebugFlag = 207;
shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1237, shm.size), 0);
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild3,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.parentConsumeFp = (ProcConsumeFp)NULL,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.shm = shm,
.parent = (void *)((int64_t)1235),
.name = "1237_p"};
SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr);
for (int32_t j = 0; j < 1; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
head.handle = (void *)((int64_t)i);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0);
}
cfg.isChild = true;
cfg.name = "child_queue";
SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr);
taosProcRun(pproc);
taosProcCleanup(pproc);
taosProcRemoveHandle(cproc, (void *)((int64_t)3));
taosProcRemoveHandle(cproc, (void *)((int64_t)5));
taosProcRemoveHandle(cproc, (void *)((int64_t)6));
taosProcCloseHandles(cproc, processHandle);
}
taosProcCleanup(cproc);
taosDropShm(&shm);
}
/**
* @file queue.cpp
* @author slguan (slguan@taosdata.com)
* @brief UTIL module queue tests
* @version 1.0
* @date 2022-01-27
*
* @copyright Copyright (c) 2022
*
*/
#include <gtest/gtest.h>
#include "os.h"
#include "tqueue.h"
#include <sys/shm.h>
#include <sys/wait.h>
class UtilTestQueue : public ::testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
public:
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
};
......@@ -55,8 +55,8 @@
# --- for multi process mode
./test.sh -f tsim/user/basic1.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/tmq/basic.sim -m
# ./test.sh -f tsim/user/basic1.sim -m
# ./test.sh -f tsim/stable/vnode3.sim -m
# ./test.sh -f tsim/tmq/basic.sim -m
#======================b1-end===============
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册