diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index b88afcb39df49c518b41968eee74638ca2143ae1..30d1bd0a518415d98b7358337c94834aa70716a2 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -51,7 +51,7 @@ typedef struct SMetaData { SArray *pTableMeta; // STableMeta array SArray *pVgroupInfo; // SVgroupInfo list SArray *pUdfList; // udf info list - SArray *pEpSetList; // qnode epset list, SArray + SArray *pQnodeList; // qnode list, SArray } SMetaData; typedef struct SCatalogCfg { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6b3a13c884e9914b9a143ed277aa9a2f6cea941e..90c5c03fa8f1e24321bbb542765885376226735c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2084,10 +2084,15 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp if (tStartDecode(&decoder) < 0) return -1; int32_t num = 0; if (tDecodeI32(&decoder, &num) < 0) return -1; - pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr)); - if (NULL == pRsp->addrsList) return -1; + if (NULL == pRsp->addrsList) { + pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr)); + if (NULL == pRsp->addrsList) return -1; + } + for (int32_t i = 0; i < num; ++i) { - if (tDecodeSQueryNodeAddr(&decoder, TARRAY_GET_ELEM(pRsp->addrsList, i)) < 0) return -1; + SQueryNodeAddr addr = {0}; + if (tDecodeSQueryNodeAddr(&decoder, &addr) < 0) return -1; + taosArrayPush(pRsp->addrsList, &addr); } tEndDecode(&decoder); diff --git a/source/dnode/mgmt/interface/src/dmInt.c b/source/dnode/mgmt/interface/src/dmInt.c index 491eff5e17b99f95d1d06544dec9775e59ba7130..2d15a7a008c01f0035b26eb2396b4cc3e3efa506 100644 --- a/source/dnode/mgmt/interface/src/dmInt.c +++ b/source/dnode/mgmt/interface/src/dmInt.c @@ -173,9 +173,13 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { void dmProcessNettestReq(SDnode *pDnode, SRpcMsg *pRpc) { dDebug("net test req is received"); - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = 0}; + SRpcMsg rsp = {.handle = pRpc->handle, .refId = pRpc->refId, .ahandle = pRpc->ahandle, .code = 0}; rsp.pCont = rpcMallocCont(pRpc->contLen); - rsp.contLen = pRpc->contLen; + if (rsp.pCont == NULL) { + rsp.code = TSDB_CODE_OUT_OF_MEMORY; + } else { + rsp.contLen = pRpc->contLen; + } rpcSendResponse(&rsp); } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index b434cf102e130737fa4493eb204613cb8fee602b..afe57e3d8ffd78f7d9684cbd7e52a264a1343668 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -177,6 +177,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 1c03ca30f44ec969f57e834cce08d0c44e5d2920..cec5933ba32b2e2f3fbcfb8a5ab48a4f8109ad18 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -451,8 +451,9 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) { goto _OVER; } + void *pIter = NULL; while (1) { - void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj); + pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj); if (pIter == NULL) break; SQueryNodeAddr nodeAddr = {0}; @@ -472,7 +473,7 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) { } int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp); - void *pRsp = taosMemoryMalloc(rspLen); + void *pRsp = rpcMallocCont(rspLen); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 7b299f1f3ce3fca46e2205f3a8c0226096ada826..907fddaec2529385856f09add338fa91159e7c69 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -17,6 +17,7 @@ #include "qndInt.h" #include "query.h" #include "qworker.h" +//#include "tudf.h" SQnode *qndOpen(const SQnodeOpt *pOption) { SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode)); @@ -25,6 +26,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return NULL; } + //udfcOpen(); + if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); return NULL; @@ -37,13 +40,15 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { void qndClose(SQnode *pQnode) { qWorkerDestroy((void **)&pQnode->pQuery); + //udfcClose(); + taosMemoryFree(pQnode); } int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { - qTrace("message in query queue is processing"); + qTrace("message in qnode query queue is processing"); SReadHandle handle = {0}; switch (pMsg->msgType) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index aa90a04478ab37d5f377903d36c7b99d43411f29..e3c504c93d9195eec1111965fc65c05119d3f853 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -135,7 +135,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg } int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { - vTrace("message in query queue is processing"); + vTrace("message in vnode query queue is processing"); SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config}; switch (pMsg->msgType) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 410527c9e6ac48c52e5926abfde59e321557eb7e..272c370ca3943c65d6163e9d1a169d19c9b80a82 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -494,7 +494,7 @@ _return: return TSDB_CODE_SUCCESS; } -int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray **out) { +int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray *out) { char *msg = NULL; int32_t msgLen = 0; @@ -526,7 +526,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt CTG_ERR_RET(code); } - ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*out)); + ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out)); return TSDB_CODE_SUCCESS; } @@ -2778,7 +2778,8 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, } if (pReq->qNodeRequired) { - CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, &pRsp->pEpSetList)); + pRsp->pQnodeList = taosArrayInit(10, sizeof(SQueryNodeAddr)); + CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, pRsp->pQnodeList)); } CTG_API_LEAVE(TSDB_CODE_SUCCESS); @@ -2807,7 +2808,7 @@ int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, &pQnodeList)); + CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, pQnodeList)); _return: diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index de805d2c772f43be55e328684d58e81086100b83..fd7c831399901a5c8d9922e011213e4267d748cd 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -342,28 +342,20 @@ PROCESS_META_OVER: int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) { SQnodeListRsp out = {0}; - int32_t code = -1; + int32_t code = 0; if (NULL == output || NULL == msg || msgSize <= 0) { code = TSDB_CODE_TSC_INVALID_INPUT; - goto PROCESS_QLIST_OVER; + return code; } + out.addrsList = (SArray *)output; if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) { qError("invalid qnode list rsp msg, msgSize:%d", msgSize); code = TSDB_CODE_INVALID_MSG; - goto PROCESS_QLIST_OVER; + return code; } -PROCESS_QLIST_OVER: - - if (code != 0) { - tFreeSQnodeListRsp(&out); - out.addrsList = NULL; - } - - *(SArray **)output = out.addrsList; - return code; } diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index 3d59ffd93d7ac7e59ac1aa9e81023f3d2cb30636..9142cc41c41f8e8e491549fbbf5c990c07264fd4 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -32,6 +32,9 @@ typedef struct SScalarCtx { #define SCL_DATA_TYPE_DUMMY_HASH 9000 #define SCL_DEFAULT_OP_NUM 10 +#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type)) +#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList) + #define sclFatal(...) qFatal(__VA_ARGS__) #define sclError(...) qError(__VA_ARGS__) #define sclWarn(...) qWarn(__VA_ARGS__) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 145afbe984a8f19e30871cd91bd62f9c5520cf32..820a4894b5ed756bcacfe030211b24b35e86b11e 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -244,23 +244,53 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t return TSDB_CODE_SUCCESS; } -int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) { +int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) { int32_t code = 0; - SScalarParam *paramList = taosMemoryCalloc(pParamList->length, sizeof(SScalarParam)); + if (NULL == pParamList) { + if (ctx->pBlockList) { + SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0); + *rowNum = pBlock->info.rows; + } else { + *rowNum = 1; + } + + *paramNum = 1; + } else { + *paramNum = pParamList->length; + } + + SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam)); if (NULL == paramList) { - sclError("calloc %d failed", (int32_t)(pParamList->length * sizeof(SScalarParam))); + sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam))); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SListCell *cell = pParamList->pHead; - for (int32_t i = 0; i < pParamList->length; ++i) { - if (NULL == cell || NULL == cell->pNode) { - sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode); - SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + if (pParamList) { + SNode *tnode = NULL; + int32_t i = 0; + if (SCL_IS_CONST_CALC(ctx)) { + WHERE_EACH (tnode, pParamList) { + if (!SCL_IS_CONST_NODE(tnode)) { + WHERE_NEXT; + } else { + SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); + ERASE_NODE(pParamList); + } + + ++i; + } + } else { + FOREACH(tnode, pParamList) { + SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); + ++i; + } } + } else { + paramList[0].numOfRows = *rowNum; + } - SCL_ERR_JRET(sclInitParam(cell->pNode, ¶mList[i], ctx, rowNum)); - cell = cell->pNext; + if (0 == *rowNum) { + taosMemoryFreeClear(paramList); } *pParams = paramList; @@ -299,37 +329,45 @@ _return: } int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { - if (NULL == node->pParameterList || node->pParameterList->length <= 0) { - sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); - SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - SScalarFuncExecFuncs ffpSet = {0}; - int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); - if (code) { - sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); - SCL_ERR_RET(code); - } - SScalarParam *params = NULL; int32_t rowNum = 0; - SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum)); - - output->columnData = createColumnInfoData(&node->node.resType, rowNum); - if (output->columnData == NULL) { - sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes)); - SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } + int32_t paramNum = 0; + int32_t code = 0; + SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum)); - code = (*ffpSet.process)(params, node->pParameterList->length, output); - if (code) { - sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + if (fmIsUserDefinedFunc(node->funcId)) { +#if 0 + UdfcFuncHandle udfHandle = NULL; + + SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle)); + code = callUdfScalarFunc(udfHandle, params, paramNum, output); + teardownUdf(udfHandle); SCL_ERR_JRET(code); +#endif + } else { + SScalarFuncExecFuncs ffpSet = {0}; + code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); + if (code) { + sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + SCL_ERR_JRET(code); + } + + output->columnData = createColumnInfoData(&node->node.resType, rowNum); + if (output->columnData == NULL) { + sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes)); + SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + code = (*ffpSet.process)(params, paramNum, output); + if (code) { + sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + SCL_ERR_JRET(code); + } } _return: - for (int32_t i = 0; i < node->pParameterList->length; ++i) { + for (int32_t i = 0; i < paramNum; ++i) { // sclFreeParamNoData(params + i); } @@ -355,8 +393,13 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o SScalarParam *params = NULL; int32_t rowNum = 0; + int32_t paramNum = 0; int32_t code = 0; - SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, &rowNum)); + SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum)); + if (NULL == params) { + output->numOfRows = 0; + return TSDB_CODE_SUCCESS; + } int32_t type = node->node.resType.type; output->numOfRows = rowNum; @@ -369,25 +412,41 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o } bool value = false; + bool complete = true; for (int32_t i = 0; i < rowNum; ++i) { - for (int32_t m = 0; m < node->pParameterList->length; ++m) { + complete = true; + for (int32_t m = 0; m < paramNum; ++m) { + if (NULL == params[m].columnData) { + complete = false; + continue; + } char* p = colDataGetData(params[m].columnData, i); GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p); if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) { + complete = true; break; } else if (LOGIC_COND_TYPE_OR == node->condType && value) { + complete = true; break; } else if (LOGIC_COND_TYPE_NOT == node->condType) { value = !value; } } - colDataAppend(output->columnData, i, (char*) &value, false); + if (complete) { + colDataAppend(output->columnData, i, (char*) &value, false); + } + } + + if (SCL_IS_CONST_CALC(ctx) && (false == complete)) { + sclFreeParam(output); + output->numOfRows = 0; } _return: - for (int32_t i = 0; i < node->pParameterList->length; ++i) { + + for (int32_t i = 0; i < paramNum; ++i) { // sclFreeParamNoData(params + i); } @@ -426,6 +485,17 @@ _return: EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)*pNode; + SNode* tnode = NULL; + if (fmIsUserDefinedFunc(node->funcId)) { + return DEAL_RES_CONTINUE; + } + + FOREACH(tnode, node->pParameterList) { + if (!SCL_IS_CONST_NODE(tnode)) { + return DEAL_RES_CONTINUE; + } + } + SScalarParam output = {0}; ctx->code = sclExecFunction(node, ctx, &output); @@ -470,6 +540,10 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) { return DEAL_RES_ERROR; } + if (0 == output.numOfRows) { + return DEAL_RES_CONTINUE; + } + SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { sclError("make value node failed"); @@ -498,6 +572,14 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) { EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { SOperatorNode *node = (SOperatorNode *)*pNode; + if (!SCL_IS_CONST_NODE(node->pLeft)) { + return DEAL_RES_CONTINUE; + } + + if (!SCL_IS_CONST_NODE(node->pRight)) { + return DEAL_RES_CONTINUE; + } + SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; ctx->code = sclExecOperator(node, ctx, &output); if (ctx->code) { @@ -530,7 +612,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { } EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { - if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) { + if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) { return DEAL_RES_CONTINUE; } diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 04ab26d8a499dee44f8e7537ddca2279753725a9..ed52e48f6d8186666163d7ff2ea714640dc14143 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -137,6 +137,11 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in rnode->node.resType.bytes = dataBytes; rnode->dataBlockId = 0; + if (NULL == block) { + *pNode = (SNode *)rnode; + return; + } + if (NULL == *block) { SSDataBlock *res = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); res->info.numOfCols = 3; @@ -889,6 +894,8 @@ TEST(constantTest, int_greater_int_is_true2) { } TEST(constantTest, greater_and_lower) { + scltInitLogFile(); + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; bool eRes[5] = {false, false, true, true, true}; int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; @@ -913,6 +920,115 @@ TEST(constantTest, greater_and_lower) { nodesDestroyNode(res); } +TEST(constantTest, column_and_value1) { + scltInitLogFile(); + + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; + bool eRes[5] = {false, false, true, true, true}; + int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; + SNode *list[2] = {0}; + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1); + scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2); + scltMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3); + scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL); + scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + list[0] = opNode1; + list[1] = opNode2; + scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2); + + int32_t code = scalarCalculateConstants(logicNode, &res); + ASSERT_EQ(code, 0); + ASSERT_TRUE(res); + ASSERT_EQ(nodeType(res), QUERY_NODE_LOGIC_CONDITION); + SLogicConditionNode *v = (SLogicConditionNode *)res; + ASSERT_EQ(v->condType, LOGIC_COND_TYPE_AND); + ASSERT_EQ(v->pParameterList->length, 1); + nodesDestroyNode(res); +} + +TEST(constantTest, column_and_value2) { + scltInitLogFile(); + + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; + bool eRes[5] = {false, false, true, true, true}; + int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; + SNode *list[2] = {0}; + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1); + scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2); + scltMakeOpNode(&opNode1, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3); + scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL); + scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + list[0] = opNode1; + list[1] = opNode2; + scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2); + + int32_t code = scalarCalculateConstants(logicNode, &res); + ASSERT_EQ(code, 0); + ASSERT_TRUE(res); + ASSERT_EQ(nodeType(res), QUERY_NODE_VALUE); + SValueNode *v = (SValueNode *)res; + ASSERT_EQ(v->node.resType.type, TSDB_DATA_TYPE_BOOL); + ASSERT_EQ(v->datum.b, false); + nodesDestroyNode(res); +} + +TEST(constantTest, column_and_value3) { + scltInitLogFile(); + + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; + bool eRes[5] = {false, false, true, true, true}; + int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; + SNode *list[2] = {0}; + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1); + scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2); + scltMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3); + scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL); + scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + list[0] = opNode1; + list[1] = opNode2; + scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_OR, list, 2); + + int32_t code = scalarCalculateConstants(logicNode, &res); + ASSERT_EQ(code, 0); + ASSERT_TRUE(res); + ASSERT_EQ(nodeType(res), QUERY_NODE_VALUE); + SValueNode *v = (SValueNode *)res; + ASSERT_EQ(v->node.resType.type, TSDB_DATA_TYPE_BOOL); + ASSERT_EQ(v->datum.b, true); + nodesDestroyNode(res); +} + +TEST(constantTest, column_and_value4) { + scltInitLogFile(); + + SNode *pval1 = NULL, *pval2 = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL, *res = NULL; + bool eRes[5] = {false, false, true, true, true}; + int64_t v1 = 333, v2 = 222, v3 = -10, v4 = 20; + SNode *list[2] = {0}; + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v1); + scltMakeValueNode(&pval2, TSDB_DATA_TYPE_BIGINT, &v2); + scltMakeOpNode(&opNode1, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + scltMakeValueNode(&pval1, TSDB_DATA_TYPE_BIGINT, &v3); + scltMakeColumnNode(&pval2, NULL, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 0, NULL); + scltMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pval1, pval2); + list[0] = opNode1; + list[1] = opNode2; + scltMakeLogicNode(&logicNode, LOGIC_COND_TYPE_OR, list, 2); + + int32_t code = scalarCalculateConstants(logicNode, &res); + ASSERT_EQ(code, 0); + ASSERT_TRUE(res); + ASSERT_EQ(nodeType(res), QUERY_NODE_LOGIC_CONDITION); + SLogicConditionNode *v = (SLogicConditionNode *)res; + ASSERT_EQ(v->condType, LOGIC_COND_TYPE_OR); + ASSERT_EQ(v->pParameterList->length, 1); + nodesDestroyNode(res); +} + + void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){ char keyVar[32] = {0}; memcpy(varDataVal(keyVar), key, strlen(key)); diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 307049dd385f4ac8fcfcca8d9caedd5d05c8e8f5..c8ec31c48bb29c56215c1bba4090eb8f4cddc7b4 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -67,7 +67,7 @@ static void shellWorkAsClient() { printf("request is sent, size:%d\n", rpcMsg.contLen); rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp); - if (rpcRsp.code == 0 &&rpcRsp.contLen == rpcMsg.contLen) { + if (rpcRsp.code == 0 && rpcRsp.contLen == rpcMsg.contLen) { printf("response is received, size:%d\n", rpcMsg.contLen); if (rpcRsp.code == 0) totalSucc++; } else { @@ -97,8 +97,12 @@ static void shellProcessMsg(void *p, SRpcMsg *pRpc, SEpSet *pEpSet) { printf("request is received, size:%d\n", pRpc->contLen); fflush(stdout); SRpcMsg rsp = {.handle = pRpc->handle, .refId = pRpc->refId, .ahandle = pRpc->ahandle, .code = 0}; - rsp.pCont = rpcMallocCont(shell.args.pktLen); - rsp.contLen = shell.args.pktLen; + rsp.pCont = rpcMallocCont(pRpc->contLen); + if (rsp.pCont == NULL) { + rsp.code = TSDB_CODE_OUT_OF_MEMORY; + } else { + rsp.contLen = pRpc->contLen; + } rpcSendResponse(&rsp); }