提交 3480e211 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

...@@ -51,7 +51,14 @@ extern int32_t tsCompatibleModel; ...@@ -51,7 +51,14 @@ extern int32_t tsCompatibleModel;
extern bool tsEnableSlaveQuery; extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth; extern bool tsPrintAuth;
extern int64_t tsTickPerDay[3]; extern int64_t tsTickPerDay[3];
// multi-process
extern bool tsMultiProcess; extern bool tsMultiProcess;
extern int32_t tsMnodeShmSize;
extern int32_t tsVnodeShmSize;
extern int32_t tsQnodeShmSize;
extern int32_t tsSnodeShmSize;
extern int32_t tsBnodeShmSize;
// monitor // monitor
extern bool tsEnableMonitor; extern bool tsEnableMonitor;
......
...@@ -110,10 +110,11 @@ typedef struct SFileBlockInfo { ...@@ -110,10 +110,11 @@ typedef struct SFileBlockInfo {
#define FUNCTION_COV 38 #define FUNCTION_COV 38
typedef struct SResultRowEntryInfo { typedef struct SResultRowEntryInfo {
int8_t hasResult; // result generated, not NULL value // int8_t hasResult:6; // result generated, not NULL value
bool initialized; // output buffer has been initialized bool initialized:1; // output buffer has been initialized
bool complete; // query has completed bool complete:1; // query has completed
uint32_t numOfRes; // num of output result in current buffer uint8_t isNullRes:6; // the result is null
uint8_t numOfRes; // num of output result in current buffer
} SResultRowEntryInfo; } SResultRowEntryInfo;
// determine the real data need to calculated the result // determine the real data need to calculated the result
...@@ -156,7 +157,6 @@ typedef struct SResultDataInfo { ...@@ -156,7 +157,6 @@ typedef struct SResultDataInfo {
#define GET_RES_INFO(ctx) ((ctx)->resultInfo) #define GET_RES_INFO(ctx) ((ctx)->resultInfo)
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo))) #define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
typedef struct SInputColumnInfoData { typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data int32_t totalRows; // total rows in current columnar data
......
...@@ -22,12 +22,12 @@ ...@@ -22,12 +22,12 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; typedef enum { PROC_REQ = 1, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
typedef struct SProcObj SProcObj; typedef struct SProcObj SProcObj;
typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcMallocFp)(int32_t contLen);
typedef void *(*ProcFreeFp)(void *pCont); typedef void *(*ProcFreeFp)(void *pCont);
typedef void *(*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
ProcFuncType ftype); ProcFuncType ftype);
typedef struct { typedef struct {
...@@ -50,6 +50,7 @@ typedef struct { ...@@ -50,6 +50,7 @@ typedef struct {
SProcObj *taosProcInit(const SProcCfg *pCfg); SProcObj *taosProcInit(const SProcCfg *pCfg);
void taosProcCleanup(SProcObj *pProc); void taosProcCleanup(SProcObj *pProc);
int32_t taosProcRun(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, ProcFuncType ftype); void *handle, ProcFuncType ftype);
......
...@@ -45,7 +45,14 @@ float tsRatioOfQueryCores = 1.0f; ...@@ -45,7 +45,14 @@ float tsRatioOfQueryCores = 1.0f;
int32_t tsMaxBinaryDisplayWidth = 30; int32_t tsMaxBinaryDisplayWidth = 30;
bool tsEnableSlaveQuery = 1; bool tsEnableSlaveQuery = 1;
bool tsPrintAuth = 0; bool tsPrintAuth = 0;
bool tsMultiProcess = 0;
// multi process
bool tsMultiProcess = false;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4;
// monitor // monitor
bool tsEnableMonitor = 1; bool tsEnableMonitor = 1;
...@@ -347,7 +354,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -347,7 +354,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1; if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
// if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
...@@ -466,7 +479,13 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -466,7 +479,13 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval;
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval; tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32;
tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32;
tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32;
tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32;
// tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
......
...@@ -512,6 +512,16 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result ...@@ -512,6 +512,16 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result
setNull((char *)result, type, tDataTypes[type].bytes); setNull((char *)result, type, tDataTypes[type].bytes);
return 0; return 0;
} }
if (IS_SIGNED_NUMERIC_TYPE(pVariant->nType) || (pVariant->nType == TSDB_DATA_TYPE_BOOL)) {
*result = pVariant->i;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pVariant->nType)) {
*result = pVariant->u;
} else if (IS_FLOAT_TYPE(pVariant->nType)) {
*result = (int64_t) pVariant->d;
} else {
//TODO: handling var types
}
#if 0 #if 0
errno = 0; errno = 0;
if (IS_SIGNED_NUMERIC_TYPE(pVariant->nType) || (pVariant->nType == TSDB_DATA_TYPE_BOOL)) { if (IS_SIGNED_NUMERIC_TYPE(pVariant->nType) || (pVariant->nType == TSDB_DATA_TYPE_BOOL)) {
...@@ -1038,4 +1048,3 @@ char * taosVariantGet(SVariant *pVar, int32_t type) { ...@@ -1038,4 +1048,3 @@ char * taosVariantGet(SVariant *pVar, int32_t type) {
return NULL; return NULL;
} }
...@@ -118,7 +118,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -118,7 +118,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
} }
static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) { if (pWrapper != NULL) {
dndReleaseWrapper(pWrapper); dndReleaseWrapper(pWrapper);
...@@ -146,7 +146,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg ...@@ -146,7 +146,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg
return code; return code;
} }
static int32_t dmProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { static int32_t dmProcessDropNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) { if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED; terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
......
...@@ -18,15 +18,15 @@ ...@@ -18,15 +18,15 @@
#include "tconfig.h" #include "tconfig.h"
static struct { static struct {
bool dumpConfig; bool dumpConfig;
bool generateGrant; bool generateGrant;
bool printAuth; bool printAuth;
bool printVersion; bool printVersion;
char envFile[PATH_MAX]; char envFile[PATH_MAX];
char apolloUrl[PATH_MAX]; char apolloUrl[PATH_MAX];
SArray *pArgs; // SConfigPair SArray *pArgs; // SConfigPair
SDnode *pDnode; SDnode *pDnode;
ENodeType ntype; EDndType ntype;
} global = {0}; } global = {0};
static void dndStopDnode(int signum, void *info, void *ctx) { static void dndStopDnode(int signum, void *info, void *ctx) {
......
...@@ -49,7 +49,7 @@ extern "C" { ...@@ -49,7 +49,7 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType; typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } EDndType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus; typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
...@@ -92,7 +92,7 @@ typedef struct SMgmtWrapper { ...@@ -92,7 +92,7 @@ typedef struct SMgmtWrapper {
char *path; char *path;
int32_t refCount; int32_t refCount;
SRWLatch latch; SRWLatch latch;
ENodeType ntype; EDndType ntype;
bool deployed; bool deployed;
bool required; bool required;
EProcType procType; EProcType procType;
...@@ -126,7 +126,7 @@ typedef struct SDnode { ...@@ -126,7 +126,7 @@ typedef struct SDnode {
int32_t numOfDisks; int32_t numOfDisks;
uint16_t serverPort; uint16_t serverPort;
bool dropped; bool dropped;
ENodeType ntype; EDndType ntype;
EDndStatus status; EDndStatus status;
EDndEvent event; EDndEvent event;
SStartupReq startup; SStartupReq startup;
...@@ -137,8 +137,8 @@ typedef struct SDnode { ...@@ -137,8 +137,8 @@ typedef struct SDnode {
// dndEnv.c // dndEnv.c
const char *dndStatStr(EDndStatus stat); const char *dndStatStr(EDndStatus stat);
const char *dndNodeLogStr(ENodeType ntype); const char *dndNodeLogStr(EDndType ntype);
const char *dndNodeProcStr(ENodeType ntype); const char *dndNodeProcStr(EDndType ntype);
const char *dndEventStr(EDndEvent ev); const char *dndEventStr(EDndEvent ev);
// dndExec.c // dndExec.c
...@@ -156,7 +156,7 @@ int32_t dndWriteShmFile(SDnode *pDnode); ...@@ -156,7 +156,7 @@ int32_t dndWriteShmFile(SDnode *pDnode);
EDndStatus dndGetStatus(SDnode *pDnode); EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat); void dndSetStatus(SDnode *pDnode, EDndStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType nType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper); void dndReleaseWrapper(SMgmtWrapper *pWrapper);
void dndHandleEvent(SDnode *pDnode, EDndEvent event); void dndHandleEvent(SDnode *pDnode, EDndEvent event);
......
...@@ -71,7 +71,7 @@ const char *dndStatStr(EDndStatus status) { ...@@ -71,7 +71,7 @@ const char *dndStatStr(EDndStatus status) {
} }
} }
const char *dndNodeLogStr(ENodeType ntype) { const char *dndNodeLogStr(EDndType ntype) {
switch (ntype) { switch (ntype) {
case VNODES: case VNODES:
return "vnode"; return "vnode";
...@@ -88,7 +88,7 @@ const char *dndNodeLogStr(ENodeType ntype) { ...@@ -88,7 +88,7 @@ const char *dndNodeLogStr(ENodeType ntype) {
} }
} }
const char *dndNodeProcStr(ENodeType ntype) { const char *dndNodeProcStr(EDndType ntype) {
switch (ntype) { switch (ntype) {
case VNODES: case VNODES:
return "taosv"; return "taosv";
......
...@@ -66,7 +66,7 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { ...@@ -66,7 +66,7 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
} }
static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
char tstr[8] = {0}; char tstr[8] = {0};
char *args[6] = {0}; char *args[6] = {0};
snprintf(tstr, sizeof(tstr), "%d", n); snprintf(tstr, sizeof(tstr), "%d", n);
...@@ -89,6 +89,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { ...@@ -89,6 +89,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
} }
static void dndProcessProcHandle(void *handle) { static void dndProcessProcHandle(void *handle) {
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE}; SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE};
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
...@@ -96,7 +97,7 @@ static void dndProcessProcHandle(void *handle) { ...@@ -96,7 +97,7 @@ static void dndProcessProcHandle(void *handle) {
static int32_t dndRunInSingleProcess(SDnode *pDnode) { static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process"); dInfo("dnode run in single process");
for (ENodeType n = DNODE; n < NODE_MAX; ++n) { for (EDndType n = DNODE; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
...@@ -109,7 +110,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { ...@@ -109,7 +110,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dndSetStatus(pDnode, DND_STAT_RUNNING); dndSetStatus(pDnode, DND_STAT_RUNNING);
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
if (pWrapper->fp.startFp == NULL) continue; if (pWrapper->fp.startFp == NULL) continue;
...@@ -141,12 +142,25 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -141,12 +142,25 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
return -1; return -1;
} }
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
int32_t shmsize = 1024 * 1024 * 2; // size will be a configuration item int32_t shmsize = tsMnodeShmSize;
if (n == VNODES) {
shmsize = tsVnodeShmSize;
} else if (n == QNODE) {
shmsize = tsQnodeShmSize;
} else if (n == SNODE) {
shmsize = tsSnodeShmSize;
} else if (n == MNODE) {
shmsize = tsMnodeShmSize;
} else if (n == BNODE) {
shmsize = tsBnodeShmSize;
} else {
}
if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) { if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
terrno = TAOS_SYSTEM_ERROR(terrno); terrno = TAOS_SYSTEM_ERROR(terrno);
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr()); dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
...@@ -169,7 +183,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -169,7 +183,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
return -1; return -1;
} }
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
...@@ -202,7 +216,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -202,7 +216,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
dInfo("dnode is about to stop"); dInfo("dnode is about to stop");
dndSetStatus(pDnode, DND_STAT_STOPPED); dndSetStatus(pDnode, DND_STAT_STOPPED);
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_MAX) continue; if (pDnode->ntype == NODE_MAX) continue;
...@@ -217,13 +231,13 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -217,13 +231,13 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
} }
break; break;
} else { } else {
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_MAX) continue; if (pDnode->ntype == NODE_MAX) continue;
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle); taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
dndNewProc(pWrapper, n); dndNewProc(pWrapper, n);
} }
......
...@@ -164,7 +164,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { ...@@ -164,7 +164,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
goto _OVER; goto _OVER;
} }
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype)); snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype));
cJSON *shmid = cJSON_GetObjectItem(root, itemName); cJSON *shmid = cJSON_GetObjectItem(root, itemName);
if (shmid && shmid->type == cJSON_Number) { if (shmid && shmid->type == cJSON_Number) {
...@@ -180,7 +180,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { ...@@ -180,7 +180,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
} }
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) { for (EDndType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (pWrapper->shm.id >= 0) { if (pWrapper->shm.id >= 0) {
dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size); dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size);
...@@ -226,7 +226,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) { ...@@ -226,7 +226,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
} }
len += snprintf(content + len, MAXLEN - len, "{\n"); len += snprintf(content + len, MAXLEN - len, "{\n");
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id); len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id);
if (ntype == NODE_MAX - 1) { if (ntype == NODE_MAX - 1) {
......
...@@ -46,7 +46,7 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -46,7 +46,7 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
} }
static void dndClearVars(SDnode *pDnode) { static void dndClearVars(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
taosMemoryFreeClear(pMgmt->path); taosMemoryFreeClear(pMgmt->path);
} }
...@@ -89,7 +89,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { ...@@ -89,7 +89,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
smSetMgmtFp(&pDnode->wrappers[SNODE]); smSetMgmtFp(&pDnode->wrappers[SNODE]);
bmSetMgmtFp(&pDnode->wrappers[BNODE]); bmSetMgmtFp(&pDnode->wrappers[BNODE]);
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path); pWrapper->path = strdup(path);
...@@ -106,7 +106,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { ...@@ -106,7 +106,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
} }
if (dndInitMsgHandle(pDnode) != 0) { if (dndInitMsgHandle(pDnode) != 0) {
dError("failed to msg handles since %s", terrstr()); dError("failed to init msg handles since %s", terrstr());
goto _OVER; goto _OVER;
} }
...@@ -134,7 +134,7 @@ _OVER: ...@@ -134,7 +134,7 @@ _OVER:
void dndClose(SDnode *pDnode) { void dndClose(SDnode *pDnode) {
if (pDnode == NULL) return; if (pDnode == NULL) return;
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper); dndCloseNode(pWrapper);
} }
...@@ -149,7 +149,7 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) { ...@@ -149,7 +149,7 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
} }
} }
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) { SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper; SMgmtWrapper *pRetWrapper = pWrapper;
......
...@@ -307,7 +307,7 @@ void dndCleanupTrans(SDnode *pDnode) { ...@@ -307,7 +307,7 @@ void dndCleanupTrans(SDnode *pDnode) {
int32_t dndInitMsgHandle(SDnode *pDnode) { int32_t dndInitMsgHandle(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (EDndType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
......
...@@ -40,9 +40,7 @@ class Testbase { ...@@ -40,9 +40,7 @@ class Testbase {
void ServerStart(); void ServerStart();
void ClientRestart(); void ClientRestart();
SRpcMsg* SendReq(tmsg_t msgType, void* pCont, int32_t contLen); SRpcMsg* SendReq(tmsg_t msgType, void* pCont, int32_t contLen);
void InitLog(const char* path);
private:
void InitLog(const char* path);
private: private:
TestServer server; TestServer server;
......
...@@ -483,13 +483,14 @@ typedef struct STableIntervalOperatorInfo { ...@@ -483,13 +483,14 @@ typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not bool timeWindowInterpo; // interpolation needed or not
char **pRow; // previous row/tuple of already processed datablock char **pRow; // previous row/tuple of already processed datablock
SAggSupporter aggSup; // aggregate supporter SAggSupporter aggSup; // aggregate supporter
STableQueryInfo *pCurrent; // current tableQueryInfo struct STableQueryInfo *pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STableIntervalOperatorInfo; } STableIntervalOperatorInfo;
...@@ -670,7 +671,7 @@ SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR ...@@ -670,7 +671,7 @@ SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
......
...@@ -980,7 +980,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* ...@@ -980,7 +980,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey; pCtx[k].startTs = pWin->skey;
// keep it temporarialy // keep it temporarily
bool hasAgg = pCtx[k].input.colDataAggIsSet; bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows; int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex; int32_t startOffset = pCtx[k].input.startRowIndex;
...@@ -1012,7 +1012,6 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* ...@@ -1012,7 +1012,6 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
pCtx[k].sfp.process(&tw, 1, &out); pCtx[k].sfp.process(&tw, 1, &out);
pEntryInfo->numOfRes = 1; pEntryInfo->numOfRes = 1;
pEntryInfo->hasResult = ',';
continue; continue;
} }
...@@ -1264,6 +1263,21 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData ...@@ -1264,6 +1263,21 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows); colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows);
pResult->info.rows = pCtx[0].input.numOfRows; pResult->info.rows = pCtx[0].input.numOfRows;
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
SVariant *pVal = pExpr->pExpr->pVal;
char *payload;
if (IS_VAR_DATA_TYPE(pVal->nType)) {
payload = taosMemoryCalloc(1, pVal->nLen + VARSTR_HEADER_SIZE);
} else {
payload = taosMemoryCalloc(1, tDataTypes[pVal->nType].bytes);
}
taosVariantDump(pVal, payload, pVal->nType, true);
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, payload, false);
}
taosMemoryFree(payload);
pResult->info.rows = pSrcBlock->info.rows;
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pBlockList, &pSrcBlock); taosArrayPush(pBlockList, &pSrcBlock);
...@@ -1511,7 +1525,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1511,7 +1525,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) { if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
// assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] == // assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] ==
// pSDataBlock->info.window.ekey); // pSDataBlock->info.window.ekey);
...@@ -3144,11 +3158,6 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD ...@@ -3144,11 +3158,6 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
} }
releaseBufPage(pBuf, bufPage); releaseBufPage(pBuf, bufPage);
/*
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
* the top and bottom query
*/
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
} }
} }
...@@ -3285,7 +3294,8 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { ...@@ -3285,7 +3294,8 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
} }
SFilterInfo* filter = NULL; SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1); code = filterSetDataFromSlotId(filter, &param1);
...@@ -3296,6 +3306,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { ...@@ -3296,6 +3306,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
SSDataBlock* px = createOneDataBlock(pBlock); SSDataBlock* px = createOneDataBlock(pBlock);
blockDataEnsureCapacity(px, pBlock->info.rows); blockDataEnsureCapacity(px, pBlock->info.rows);
// todo extract method
int32_t numOfRow = 0; int32_t numOfRow = 0;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
...@@ -3307,7 +3318,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { ...@@ -3307,7 +3318,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
continue; continue;
} }
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false); if (colDataIsNull_s(pSrc, j)) {
colDataAppendNULL(pDst, numOfRow);
} else {
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
}
numOfRow += 1; numOfRow += 1;
} }
...@@ -3525,7 +3540,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI ...@@ -3525,7 +3540,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset); SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
char* in = GET_ROWCELL_INTERBUF(pEntryInfo); char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
colDataAppend(pColInfoData, nrows, in, pEntryInfo->numOfRes == 0); colDataAppend(pColInfoData, nrows, in, pEntryInfo->isNullRes);
} }
releaseBufPage(pBuf, page); releaseBufPage(pBuf, page);
...@@ -6355,7 +6370,7 @@ _error: ...@@ -6355,7 +6370,7 @@ _error:
} }
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -6371,6 +6386,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -6371,6 +6386,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->win.skey = 0; pInfo->win.skey = 0;
pInfo->win.ekey = INT64_MAX; pInfo->win.ekey = INT64_MAX;
pInfo->primaryTsIndex = primaryTsSlot;
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win);
...@@ -6958,6 +6975,16 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* ...@@ -6958,6 +6975,16 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType); pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_VALUE) {
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
SValueNode* pValueNode = (SValueNode*)pTargetNode->pExpr;
SDataType* pType = &pValueNode->node.resType;
char *pDatum = nodesGetValueFromNode(pValueNode);
if (IS_VAR_DATA_TYPE(pType->type)) {
pDatum = varDataVal(pDatum);
}
pExp->pExpr->pVal = taosMemoryCalloc(1, sizeof(SVariant));
taosVariantCreateFromBinary(pExp->pExpr->pVal, pDatum, pType->bytes, pType->type);
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) { } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) {
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
...@@ -7137,18 +7164,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7137,18 +7164,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
// todo: set the correct primary timestamp key column
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval, SInterval interval = {
.sliding = pIntervalPhyNode->sliding, .interval = pIntervalPhyNode->interval,
.intervalUnit = pIntervalPhyNode->intervalUnit, .sliding = pIntervalPhyNode->sliding,
.slidingUnit = pIntervalPhyNode->slidingUnit, .intervalUnit = pIntervalPhyNode->intervalUnit,
.offset = pIntervalPhyNode->offset, .slidingUnit = pIntervalPhyNode->slidingUnit,
.precision = TSDB_TIME_PRECISION_MILLI}; .offset = pIntervalPhyNode->offset,
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); .precision = pIntervalPhyNode->precision
};
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId;
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
} }
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
......
...@@ -106,7 +106,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in ...@@ -106,7 +106,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
return true; return true;
} }
static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL; SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) { for (int32_t i = 0; i < numOfGroupCols; ++i) {
...@@ -131,7 +131,7 @@ static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int3 ...@@ -131,7 +131,7 @@ static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int3
} }
} }
static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) { static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
ASSERT(pKey != NULL); ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
...@@ -170,11 +170,12 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t ...@@ -170,11 +170,12 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo); char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
char* data = colDataGetData(pColInfoData, rowIndex); char* data = colDataGetData(pColInfoData, rowIndex);
// set result exists, todo refactor
memcpy(dest, data, pColInfoData->info.bytes); memcpy(dest, data, pColInfoData->info.bytes);
pEntryInfo->hasResult = DATA_SET_FLAG; } else { // it is a NULL value
pEntryInfo->numOfRes = 1; pEntryInfo->isNullRes = 1;
} }
pEntryInfo->numOfRes = 1;
} }
} }
} }
...@@ -197,7 +198,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -197,7 +198,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical. // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) { if (!pInfo->isInit) {
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
pInfo->isInit = true; pInfo->isInit = true;
num++; num++;
continue; continue;
...@@ -209,7 +210,14 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -209,7 +210,14 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
continue; continue;
} }
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); // The first row of a new block does not belongs to the previous existed group
if (!equal && j == 0) {
num++;
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
continue;
}
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
...@@ -220,12 +228,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -220,12 +228,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// assign the group keys or user input constant values if required // assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
num = 1; num = 1;
} }
if (num > 0) { if (num > 0) {
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); /*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = int32_t ret =
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
...@@ -288,8 +296,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -288,8 +296,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
while(1) { while(1) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.rowCellInfoOffset);
doFilter(pInfo->pCondition, pRes); doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
......
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
break; \ break; \
} \ } \
(_info)->numOfRes = (res); \ (_info)->numOfRes = (res); \
(_info)->hasResult = DATA_SET_FLAG; \
} while (0) } while (0)
typedef struct SSumRes { typedef struct SSumRes {
...@@ -49,11 +48,11 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { ...@@ -49,11 +48,11 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
return true; return true;
} }
static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); }
void functionFinalize(SqlFunctionCtx *pCtx) { void functionFinalize(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
doFinalizer(pResInfo);
cleanupResultRowEntry(pResInfo);
pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
} }
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
...@@ -715,7 +714,6 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) { ...@@ -715,7 +714,6 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
} }
SET_VAL(pResInfo, notNullElems, 1); SET_VAL(pResInfo, notNullElems, 1);
pResInfo->hasResult = DATA_SET_FLAG;
} }
// TODO set the correct parameter. // TODO set the correct parameter.
...@@ -775,9 +773,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { ...@@ -775,9 +773,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
// DO_UPDATE_TAG_COLUMNS(pCtx, k); // DO_UPDATE_TAG_COLUMNS(pCtx, k);
// } // }
pResInfo->hasResult = DATA_SET_FLAG;
pResInfo->complete = true; pResInfo->complete = true;
numOfElems++; numOfElems++;
break; break;
} }
...@@ -815,8 +811,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { ...@@ -815,8 +811,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; // TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->hasResult = DATA_SET_FLAG;
pResInfo->complete = true; // set query completed on this column pResInfo->complete = true; // set query completed on this column
numOfElems++; numOfElems++;
break; break;
...@@ -830,10 +824,8 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { ...@@ -830,10 +824,8 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { if (pResInfo->numOfRes == 0 || (*(TSKEY*)buf) < ts) {
pResInfo->hasResult = DATA_SET_FLAG;
memcpy(buf, data, pCtx->inputBytes); memcpy(buf, data, pCtx->inputBytes);
*(TSKEY*)buf = ts; *(TSKEY*)buf = ts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
} }
......
...@@ -208,7 +208,7 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock ...@@ -208,7 +208,7 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]); SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]);
if (!pResInfo->hasResult) { if (pResInfo->numOfRes == 0) {
for(int32_t j = 0; j < pResInfo->numOfRes; ++j) { for(int32_t j = 0; j < pResInfo->numOfRes; ++j) {
colDataAppend(pCol, j, NULL, true); // TODO add set null data api colDataAppend(pCol, j, NULL, true); // TODO add set null data api
} }
......
...@@ -783,11 +783,11 @@ void* nodesGetValueFromNode(SValueNode *pNode) { ...@@ -783,11 +783,11 @@ void* nodesGetValueFromNode(SValueNode *pNode) {
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
return (void*)&pNode->datum.u; return (void*)&pNode->datum.u;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
return (void*)&pNode->datum.d; return (void*)&pNode->datum.d;
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
return (void*)pNode->datum.p; return (void*)pNode->datum.p;
default: default:
break; break;
...@@ -803,7 +803,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) { ...@@ -803,7 +803,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) {
if (NULL == buf) { if (NULL == buf) {
return NULL; return NULL;
} }
sprintf(buf, "%s", pNode->datum.b ? "true" : "false"); sprintf(buf, "%s", pNode->datum.b ? "true" : "false");
return buf; return buf;
} }
...@@ -816,7 +816,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) { ...@@ -816,7 +816,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) {
if (NULL == buf) { if (NULL == buf) {
return NULL; return NULL;
} }
sprintf(buf, "%" PRId64, pNode->datum.i); sprintf(buf, "%" PRId64, pNode->datum.i);
return buf; return buf;
} }
...@@ -828,7 +828,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) { ...@@ -828,7 +828,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) {
if (NULL == buf) { if (NULL == buf) {
return NULL; return NULL;
} }
sprintf(buf, "%" PRIu64, pNode->datum.u); sprintf(buf, "%" PRIu64, pNode->datum.u);
return buf; return buf;
} }
...@@ -838,7 +838,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) { ...@@ -838,7 +838,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) {
if (NULL == buf) { if (NULL == buf) {
return NULL; return NULL;
} }
sprintf(buf, "%e", pNode->datum.d); sprintf(buf, "%e", pNode->datum.d);
return buf; return buf;
} }
...@@ -850,7 +850,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) { ...@@ -850,7 +850,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) {
if (NULL == buf) { if (NULL == buf) {
return NULL; return NULL;
} }
snprintf(buf, bufSize, "'%s'", varDataVal(pNode->datum.p)); snprintf(buf, bufSize, "'%s'", varDataVal(pNode->datum.p));
return buf; return buf;
} }
......
...@@ -20,7 +20,13 @@ ...@@ -20,7 +20,13 @@
int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) {
pShm->id = -1; pShm->id = -1;
int32_t shmid = shmget(0X95270000 + key, shmsize, IPC_CREAT | 0600); // key_t shkey = IPC_PRIVATE;
// int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600;
key_t __shkey = 0X95270000 + key;
int32_t __shmflag = IPC_CREAT | 0600;
int32_t shmid = shmget(__shkey, shmsize, __shmflag);
if (shmid < 0) { if (shmid < 0) {
return -1; return -1;
} }
......
...@@ -348,6 +348,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { ...@@ -348,6 +348,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
} }
} }
uError("name:%s, cfg not found", name);
terrno = TSDB_CODE_CFG_NOT_FOUND; terrno = TSDB_CODE_CFG_NOT_FOUND;
return NULL; return NULL;
} }
......
...@@ -68,7 +68,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t ...@@ -68,7 +68,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported") TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
...@@ -96,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there") ...@@ -96,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string")
TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible") TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible")
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "tlog.h" #include "tlog.h"
#include "tqueue.h" #include "tqueue.h"
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
typedef void *(*ProcThreadFp)(void *param); typedef void *(*ProcThreadFp)(void *param);
typedef struct SProcQueue { typedef struct SProcQueue {
...@@ -80,7 +79,7 @@ static int32_t taosProcInitMutex(SProcQueue *pQueue) { ...@@ -80,7 +79,7 @@ static int32_t taosProcInitMutex(SProcQueue *pQueue) {
} }
if (taosThreadMutexInit(&pQueue->mutex, &mattr) != 0) { if (taosThreadMutexInit(&pQueue->mutex, &mattr) != 0) {
taosThreadMutexDestroy(&pQueue->mutex); taosThreadMutexAttrDestroy(&mattr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex since %s", terrstr()); uError("failed to init mutex since %s", terrstr());
return -1; return -1;
...@@ -156,6 +155,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { ...@@ -156,6 +155,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) { const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
const int32_t headLen = CEIL8(rawHeadLen); const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
...@@ -177,13 +181,13 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char ...@@ -177,13 +181,13 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
const int32_t pos = pQueue->tail; const int32_t pos = pQueue->tail;
if (pQueue->tail < pQueue->total) { if (pQueue->tail < pQueue->total) {
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen; *(int16_t *)(pQueue->pBuffer + pQueue->tail) = rawHeadLen;
*(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype; *(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
*(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen; *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = rawBodyLen;
} else { } else {
*(int16_t *)(pQueue->pBuffer) = headLen; *(int16_t *)(pQueue->pBuffer) = rawHeadLen;
*(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype; *(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen; *(int32_t *)(pQueue->pBuffer + 4) = rawBodyLen;
} }
if (pQueue->tail < pQueue->head) { if (pQueue->tail < pQueue->head) {
...@@ -239,18 +243,20 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea ...@@ -239,18 +243,20 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
return 0; return 0;
} }
int16_t headLen = 0; int16_t rawHeadLen = 0;
int8_t ftype = 0; int8_t ftype = 0;
int32_t bodyLen = 0; int32_t rawBodyLen = 0;
if (pQueue->head < pQueue->total) { if (pQueue->head < pQueue->total) {
headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head); rawHeadLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2); ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); rawBodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
} else { } else {
headLen = *(int16_t *)(pQueue->pBuffer); rawHeadLen = *(int16_t *)(pQueue->pBuffer);
ftype = *(int8_t *)(pQueue->pBuffer + 2); ftype = *(int8_t *)(pQueue->pBuffer + 2);
bodyLen = *(int32_t *)(pQueue->pBuffer + 4); rawBodyLen = *(int32_t *)(pQueue->pBuffer + 4);
} }
int16_t headLen = CEIL8(rawHeadLen);
int32_t bodyLen = CEIL8(rawBodyLen);
void *pHead = (*mallocHeadFp)(headLen); void *pHead = (*mallocHeadFp)(headLen);
void *pBody = (*mallocBodyFp)(bodyLen); void *pBody = (*mallocBodyFp)(bodyLen);
...@@ -301,12 +307,12 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea ...@@ -301,12 +307,12 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
*ppHead = pHead; *ppHead = pHead;
*ppBody = pBody; *ppBody = pBody;
*pHeadLen = headLen; *pHeadLen = rawHeadLen;
*pBodyLen = bodyLen; *pBodyLen = rawBodyLen;
*pFuncType = (ProcFuncType)ftype; *pFuncType = (ProcFuncType)ftype;
uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
pQueue->items, headLen, pHead, bodyLen, pBody); pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody);
return 1; return 1;
} }
...@@ -383,7 +389,7 @@ static void taosProcThreadLoop(SProcObj *pProc) { ...@@ -383,7 +389,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
freeBodyFp = pProc->parentFreeBodyFp; freeBodyFp = pProc->parentFreeBodyFp;
} }
uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue); uDebug("proc:%s, start to get msg from queue:%p, thread:%" PRId64, pProc->name, pQueue, pProc->thread);
while (1) { while (1) {
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp, int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
...@@ -392,7 +398,7 @@ static void taosProcThreadLoop(SProcObj *pProc) { ...@@ -392,7 +398,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue); uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue);
break; break;
} else if (numOfMsgs < 0) { } else if (numOfMsgs < 0) {
uTrace("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr()); uError("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr());
taosMsleep(1); taosMsleep(1);
continue; continue;
} else { } else {
...@@ -412,11 +418,11 @@ int32_t taosProcRun(SProcObj *pProc) { ...@@ -412,11 +418,11 @@ int32_t taosProcRun(SProcObj *pProc) {
return -1; return -1;
} }
uDebug("proc:%s, start to consume queue:%p, thread:%" PRId64, pProc->name, pProc->pChildQueue, pProc->thread); uDebug("proc:%s, start to consume, thread:%" PRId64, pProc->name, pProc->thread);
return 0; return 0;
} }
static void taosProcStop(SProcObj *pProc) { void taosProcStop(SProcObj *pProc) {
if (!taosCheckPthreadValid(pProc->thread)) return; if (!taosCheckPthreadValid(pProc->thread)) return;
uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread); uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
...@@ -428,6 +434,7 @@ static void taosProcStop(SProcObj *pProc) { ...@@ -428,6 +434,7 @@ static void taosProcStop(SProcObj *pProc) {
} }
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
taosThreadJoin(pProc->thread, NULL); taosThreadJoin(pProc->thread, NULL);
pProc->thread = 0;
} }
void taosProcCleanup(SProcObj *pProc) { void taosProcCleanup(SProcObj *pProc) {
...@@ -448,6 +455,10 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -448,6 +455,10 @@ void taosProcCleanup(SProcObj *pProc) {
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, ProcFuncType ftype) { void *handle, ProcFuncType ftype) {
if (ftype != PROC_REQ) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype); return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype);
} }
...@@ -464,13 +475,18 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { ...@@ -464,13 +475,18 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
while (h != NULL) { while (h != NULL) {
void *handle = *((void **)h); void *handle = *((void **)h);
(*HandleFp)(handle); (*HandleFp)(handle);
h = taosHashIterate(pProc->hash, h);
} }
taosHashClear(pProc->hash);
taosThreadMutexUnlock(&pProc->pChildQueue->mutex); taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
} }
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) { ProcFuncType ftype) {
int32_t retry = 0;
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
taosMsleep(1); uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
retry++;
taosMsleep(retry);
} }
} }
...@@ -46,11 +46,11 @@ add_executable(encodeTest "encodeTest.cpp") ...@@ -46,11 +46,11 @@ add_executable(encodeTest "encodeTest.cpp")
target_link_libraries(encodeTest os util gtest gtest_main) target_link_libraries(encodeTest os util gtest gtest_main)
# queueTest # queueTest
add_executable(queue_test "queueTest.cpp") add_executable(procTest "procTest.cpp")
target_link_libraries(queue_test os util gtest_main) target_link_libraries(procTest os util transport sut gtest_main)
add_test( add_test(
NAME queue_test NAME procTest
COMMAND queue_test COMMAND procTest
) )
# cfgTest # cfgTest
......
/**
* @file queue.cpp
* @author slguan (slguan@taosdata.com)
* @brief UTIL module queue tests
* @version 1.0
* @date 2022-01-27
*
* @copyright Copyright (c) 2022
*
*/
#include <gtest/gtest.h>
#include "tlog.h"
#include "tprocess.h"
#include "tqueue.h"
typedef struct STestMsg {
uint16_t msgType;
void *pCont;
int contLen;
int32_t code;
void *handle; // rpc handle returned to app
void *ahandle; // app handle set by client
int noResp; // has response or not(default 0, 0: resp, 1: no resp);
int persistHandle; // persist handle or not
} STestMsg;
class UtilTesProc : public ::testing::Test {
public:
void SetUp() override {
shm.id = -1;
for (int32_t i = 0; i < 4000; ++i) {
body[i] = i % 26 + 'a';
}
head.pCont = body;
head.code = 1;
head.msgType = 2;
head.noResp = 3;
head.persistHandle = 4;
taosRemoveDir("/tmp/td");
taosMkDir("/tmp/td");
tstrncpy(tsLogDir, "/tmp/td", PATH_MAX);
if (taosInitLog("taosdlog", 1) != 0) {
printf("failed to init log file\n");
}
}
void TearDown() override { taosDropShm(&shm); }
public:
static STestMsg head;
static char body[4000];
static SShm shm;
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
};
SShm UtilTesProc::shm;
char UtilTesProc::body[4000];
STestMsg UtilTesProc::head;
TEST_F(UtilTesProc, 00_Init_Cleanup) {
ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0);
shm.size = 1023;
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.childFreeBodyFp = (ProcFreeFp)taosMemoryMalloc,
.parentConsumeFp = (ProcConsumeFp)NULL,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeBodyFp = (ProcFreeFp)taosMemoryMalloc,
.shm = shm,
.parent = &shm,
.name = "1234"};
SProcObj *proc = taosProcInit(&cfg);
ASSERT_EQ(proc, nullptr);
shm.size = 2468;
cfg.shm = shm;
proc = taosProcInit(&cfg);
ASSERT_NE(proc, nullptr);
ASSERT_EQ(taosProcRun(proc), 0);
taosProcCleanup(proc);
taosDropShm(&shm);
}
void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
memcpy(body, pBody, bodyLen);
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent,
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
taosMemoryFree(pBody);
taosFreeQitem(pHead);
}
TEST_F(UtilTesProc, 01_Push_Pop_Child) {
shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1235, shm.size), 0);
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild1,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.parentConsumeFp = (ProcConsumeFp)NULL,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.shm = shm,
.parent = (void *)((int64_t)1235),
.name = "1235_c"};
SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RSP), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REGIST), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RELEASE), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_REQ), 0);
for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0);
}
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0);
cfg.isChild = true;
cfg.name = "1235_p";
SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr);
taosProcRun(pproc);
taosProcCleanup(pproc);
}
taosProcCleanup(cproc);
taosDropShm(&shm);
}
void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
memcpy(body, pBody, bodyLen);
uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent,
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
taosMemoryFree(pBody);
taosMemoryFree(pHead);
}
TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1236, shm.size), 0);
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.parentConsumeFp = (ProcConsumeFp)ConsumeParent1,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.shm = shm,
.parent = (void *)((int64_t)1236),
.name = "1236_c"};
SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr);
cfg.name = "1236_p";
cfg.isChild = true;
SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr);
for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_REQ);
}
taosProcRun(cproc);
taosProcStop(cproc);
}
taosProcCleanup(pproc);
taosProcCleanup(cproc);
taosDropShm(&shm);
}
void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
memcpy(body, pBody, bodyLen);
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent,
ftype, headLen, bodyLen, (int64_t)msg.handle, body);
taosMemoryFree(pBody);
taosFreeQitem(pHead);
}
void processHandle(void *handle) { uDebug("----> remove handle:%" PRId64 " <----", (int64_t)handle); }
TEST_F(UtilTesProc, 03_Handle) {
// uDebugFlag = 207;
shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1237, shm.size), 0);
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild3,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.parentConsumeFp = (ProcConsumeFp)NULL,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
.shm = shm,
.parent = (void *)((int64_t)1235),
.name = "1237_p"};
SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr);
for (int32_t j = 0; j < 1; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
head.handle = (void *)((int64_t)i);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0);
}
cfg.isChild = true;
cfg.name = "child_queue";
SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr);
taosProcRun(pproc);
taosProcCleanup(pproc);
taosProcRemoveHandle(cproc, (void *)((int64_t)3));
taosProcRemoveHandle(cproc, (void *)((int64_t)5));
taosProcRemoveHandle(cproc, (void *)((int64_t)6));
taosProcCloseHandles(cproc, processHandle);
}
taosProcCleanup(cproc);
taosDropShm(&shm);
}
/**
* @file queue.cpp
* @author slguan (slguan@taosdata.com)
* @brief UTIL module queue tests
* @version 1.0
* @date 2022-01-27
*
* @copyright Copyright (c) 2022
*
*/
#include <gtest/gtest.h>
#include "os.h"
#include "tqueue.h"
#include <sys/shm.h>
#include <sys/wait.h>
class UtilTestQueue : public ::testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
public:
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
};
...@@ -55,8 +55,8 @@ ...@@ -55,8 +55,8 @@
# --- for multi process mode # --- for multi process mode
./test.sh -f tsim/user/basic1.sim -m # ./test.sh -f tsim/user/basic1.sim -m
./test.sh -f tsim/stable/vnode3.sim -m # ./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/tmq/basic.sim -m # ./test.sh -f tsim/tmq/basic.sim -m
#======================b1-end=============== #======================b1-end===============
...@@ -206,7 +206,7 @@ if $data02 != 2678400000 then ...@@ -206,7 +206,7 @@ if $data02 != 2678400000 then
endi endi
sql select _wstartts, count(tbcol), _wduration, _wstartts, count(*) from ct3 interval(1n, 1w) sliding(2w) sql select _wstartts, count(tbcol), _wduration, _wstartts, count(*) from ct3 interval(1n, 1w) sliding(2w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w) print ===> select _wstartts, count(tbcol), _wduration, _wstartts, count(*) from ct3 interval(1n, 1w) sliding(2w)
print ===> rows: $rows print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04 print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14 print ===> rows1: $data10 $data11 $data12 $data13 $data14
...@@ -219,6 +219,7 @@ if $data00 != @21-11-30 08:00:00.000@ then ...@@ -219,6 +219,7 @@ if $data00 != @21-11-30 08:00:00.000@ then
return -1 return -1
endi endi
if $data01 != NULL then if $data01 != NULL then
print expect null, actual: $data01
return -1 return -1
endi endi
if $data31 != $data34 then if $data31 != $data34 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册