diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 544af9b6eed46cde117815a66e0f99e1e60c0322..71be1a501406243bc50fad2cd32f34e6af78f349 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -70,7 +70,7 @@ typedef uint16_t tmsg_t; #define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_STATE 7 -enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX }; +enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX }; enum { HEARTBEAT_KEY_USER_AUTHINFO = 1, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 807234a1b15a7a5ed6040112e12eec6e7a6503bc..c25d74911c916ed26c1a04241909975ab8ab793a 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx); bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t sampleFunction(SqlFunctionCtx* pCtx); -int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + +bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t tailFunction(SqlFunctionCtx* pCtx); +int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index fc930083128c8bfe79f69e21f3c6737547ea67f1..e41e3c7c39a2a18e262e892cf659fd3258361ace 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -386,6 +386,35 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } +static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + if (2 != numOfParams && 3 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The input parameter of TAIL function can only be column"); + } + + for (int32_t i = 1; i < numOfParams; ++i) { + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; + if (!IS_INTEGER_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + } + + SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + uint8_t colType = pCol->resType.type; + if (IS_VAR_DATA_TYPE(colType)) { + pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType}; + } else { + pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType}; + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // todo return TSDB_CODE_SUCCESS; @@ -850,6 +879,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = sampleFunction, .finalizeFunc = NULL }, + { + .name = "tail", + .type = FUNCTION_TYPE_TAIL, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateTail, + .getEnvFunc = getTailFuncEnv, + .initFunc = tailFunctionSetup, + .processFunc = tailFunction, + .finalizeFunc = tailFinalize + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index af86eb4e90d2fab886b2c9c7f49d5df64d6176ca..479a11ca4a2d38321c93c1bba7941f1874a76073 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -18,12 +18,15 @@ #include "function.h" #include "querynodes.h" #include "taggfunction.h" +#include "tcompare.h" #include "tdatablock.h" #include "tpercentile.h" #define HISTOGRAM_MAX_BINS_NUM 1000 #define MAVG_MAX_POINTS_NUM 1000 #define SAMPLE_MAX_POINTS_NUM 1000 +#define TAIL_MAX_POINTS_NUM 100 +#define TAIL_MAX_OFFSET 100 typedef struct SSumRes { union { @@ -161,6 +164,21 @@ typedef struct SSampleInfo { int64_t *timestamp; } SSampleInfo; +typedef struct STailItem { + int64_t timestamp; + bool isNull; + char data[]; +} STailItem; + +typedef struct STailInfo { + int32_t numOfPoints; + int32_t numAdded; + int32_t offset; + uint8_t colType; + int16_t colBytes; + STailItem **pItems; +} STailInfo; + #define SET_VAL(_info, numOfElem, res) \ do { \ if ((numOfElem) <= 0) { \ @@ -3107,7 +3125,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { int32_t startOffset = pCtx->offset; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (colDataIsNull_s(pInputCol, i)) { //colDataAppendNULL(pOutput, i); continue; } @@ -3141,3 +3159,132 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { // // return pResInfo->numOfRes; //} + + +bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); + SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); + int32_t numOfPoints = pVal->datum.i; + pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailItem) + pCol->node.resType.bytes); + return true; +} + +bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + STailInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo); + pInfo->numAdded = 0; + pInfo->numOfPoints = pCtx->param[1].param.i; + if (pCtx->numOfParams == 4) { + pInfo->offset = pCtx->param[2].param.i; + } else { + pInfo->offset = 0; + } + pInfo->colType = pCtx->resDataInfo.type; + pInfo->colBytes = pCtx->resDataInfo.bytes; + if ((pInfo->numOfPoints < 1 || pInfo->numOfPoints > TAIL_MAX_POINTS_NUM) || + (pInfo->numOfPoints < 0 || pInfo->numOfPoints > TAIL_MAX_OFFSET)) { + return false; + } + + pInfo->pItems = (STailItem **)((char *)pInfo + sizeof(STailInfo)); + char *pItem = (char *)pInfo->pItems + pInfo->numOfPoints * POINTER_BYTES; + + size_t unitSize = sizeof(STailItem) + pInfo->colBytes; + for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + pInfo->pItems[i] = (STailItem *)(pItem + i * unitSize); + pInfo->pItems[i]->isNull = false; + } + + return true; +} + +static void tailAssignResult(STailItem* pItem, char *data, int32_t colBytes, TSKEY ts, bool isNull) { + pItem->timestamp = ts; + if (isNull) { + pItem->isNull = true; + } else { + memcpy(pItem->data, data, colBytes); + } +} + +static int32_t tailCompFn(const void *p1, const void *p2, const void *param) { + STailItem *d1 = *(STailItem **)p1; + STailItem *d2 = *(STailItem **)p2; + return compareInt64Val(&d1->timestamp, &d2->timestamp); +} + +static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts, bool isNull) { + STailItem **pList = pInfo->pItems; + if (pInfo->numAdded < pInfo->numOfPoints) { + tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts, isNull); + taosheapsort((void *)pList, sizeof(STailItem **), pInfo->numAdded + 1, NULL, tailCompFn, 0); + pInfo->numAdded++; + } else if (pList[0]->timestamp < ts) { + tailAssignResult(pList[0], data, pInfo->colBytes, ts, isNull); + taosheapadjust((void *)pList, sizeof(STailItem **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0); + } +} + +int32_t tailFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + STailInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t startOffset = pCtx->offset; + if (pInfo->offset >= pInput->numOfRows) { + return 0; + } else { + pInfo->numOfPoints = MIN(pInfo->numOfPoints, pInput->numOfRows - pInfo->offset); + } + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex - pInfo->offset; i += 1) { + + char* data = colDataGetData(pInputCol, i); + doTailAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i)); + } + + taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn); + + for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + int32_t pos = startOffset + i; + STailItem *pItem = pInfo->pItems[i]; + if (pItem->isNull) { + colDataAppendNULL(pOutput, pos); + } else { + colDataAppend(pOutput, pos, pItem->data, false); + } + } + + return pInfo->numOfPoints; +} + +int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + STailInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); + pEntryInfo->complete = true; + + int32_t type = pCtx->input.pData[0]->info.type; + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + // todo assign the tag value and the corresponding row data + int32_t currentRow = pBlock->info.rows; + for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { + STailItem *pItem = pInfo->pItems[i]; + colDataAppend(pCol, currentRow, pItem->data, false); + + //setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow); + currentRow += 1; + } + + return pEntryInfo->numOfRes; +} diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 4841e052679ff697bcdf806632ac560aeb5134f8..388ec28b763c123518bb05cec29f6b8312ed9aa5 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub { char udfName[TSDB_FUNC_NAME_LEN]; UdfcFuncHandle handle; int32_t refCount; + int64_t lastRefTime; } SUdfcFuncStub; typedef struct SUdfcProxy { @@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); *pHandle = foundStub->handle; ++foundStub->refCount; + foundStub->lastRefTime = taosGetTimestampUs(); return 0; } *pHandle = NULL; @@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { strcpy(stub.udfName, udfName); stub.handle = *pHandle; ++stub.refCount; + stub.lastRefTime = taosGetTimestampUs(); taosArrayPush(gUdfdProxy.udfStubs, &stub); taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); } else { @@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() { fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); doTeardownUdf(stub->handle); } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle); + fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", + stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); taosArrayPush(udfStubs, stub); } ++i; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 34681dc6cd18c6e85523a61084b5ec931bf326af..706bf28be0a7c8f514b68df35bc5ef36bdfd7afd 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { uv_stop(global.loop); } -void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } +typedef enum EUdfdRpcReqRspType { + UDFD_RPC_MNODE_CONNECT = 0, + UDFD_RPC_RETRIVE_FUNC, +} EUdfdRpcReqRspType; + +typedef struct SUdfdRpcSendRecvInfo { + EUdfdRpcReqRspType rpcType; + int32_t code; + void* param; + uv_sem_t resultSem; +} SUdfdRpcSendRecvInfo; + + +void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle; + ASSERT(pMsg->ahandle != NULL); + + if (pEpSet) { + if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { + updateEpSet_s(&global.mgmtEp, pEpSet); + } + } + + if (pMsg->code != TSDB_CODE_SUCCESS) { + fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); + msgInfo->code = pMsg->code; + goto _return; + } + + if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); + if (connectRsp.epSet.numOfEps == 0) { + msgInfo->code = TSDB_CODE_MND_APP_ERROR; + goto _return; + } + + if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { + updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); + } + msgInfo->code = 0; + } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { + SRetrieveFuncRsp retrieveRsp = {0}; + tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); + + SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); + SUdf* udf = msgInfo->param; + udf->funcType = pFuncInfo->funcType; + udf->scriptType = pFuncInfo->scriptType; + udf->outputType = pFuncInfo->funcType; + udf->outputLen = pFuncInfo->outputLen; + udf->bufSize = pFuncInfo->bufSize; + + char path[PATH_MAX] = {0}; + snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name); + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + // TODO check for failure of flush to disk + taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); + taosCloseFile(&file); + strncpy(udf->path, path, strlen(path)); + taosArrayDestroy(retrieveRsp.pFuncInfos); + msgInfo->code = 0; + } + +_return: + rpcFreeCont(pMsg->pCont); + uv_sem_post(&msgInfo->resultSem); + return; +} + +int32_t udfdConnectToMNode() { + SConnectReq connReq = {0}; + connReq.connType = CONN_TYPE__UDFD; + tstrncpy(connReq.app, "udfd",sizeof(connReq.app)); + tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); + char pass[TSDB_PASSWORD_LEN + 1] = {0}; + taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); + tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); + connReq.pid = htonl(taosGetPId()); + connReq.startTime = htobe64(taosGetTimestampMs()); + + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connReq); + + SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_MND_CONNECT; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.ahandle = msgInfo; + rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + int32_t code = msgInfo->code; + uv_sem_destroy(&msgInfo->resultSem); + taosMemoryFree(msgInfo); + return code; +} + +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { + SRetrieveFuncReq retrieveReq = {0}; + retrieveReq.numOfFuncs = 1; + retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); + taosArrayPush(retrieveReq.pFuncNames, udfName); + + int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); + void *pReq = rpcMallocCont(contLen); + tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); + taosArrayDestroy(retrieveReq.pFuncNames); + + SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; + msgInfo->param = udf; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; + rpcMsg.ahandle = msgInfo; + rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + uv_sem_destroy(&msgInfo->resultSem); + int32_t code = msgInfo->code; + taosMemoryFree(msgInfo); + return code; +} + +static bool udfdRpcRfp(int32_t code) { + if (code == TSDB_CODE_RPC_REDIRECT) { + return true; + } else { + return false; + } +} int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { pEpSet->version = 0; @@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe return 0; } -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { - SRetrieveFuncReq retrieveReq = {0}; - retrieveReq.numOfFuncs = 1; - retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, udfName); - - int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); - void *pReq = rpcMallocCont(contLen); - tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); - taosArrayDestroy(retrieveReq.pFuncNames); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; - - SRpcMsg rpcRsp = {0}; - rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp); - SRetrieveFuncRsp retrieveRsp = {0}; - tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp); - - SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - - udf->funcType = pFuncInfo->funcType; - udf->scriptType = pFuncInfo->scriptType; - udf->outputType = pFuncInfo->funcType; - udf->outputLen = pFuncInfo->outputLen; - udf->bufSize = pFuncInfo->bufSize; - - char path[PATH_MAX] = {0}; - snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", udfName); - TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); - // TODO check for failure of flush to disk - taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); - taosCloseFile(&file); - strncpy(udf->path, path, strlen(path)); - taosArrayDestroy(retrieveRsp.pFuncInfos); - - rpcFreeCont(rpcRsp.pCont); - return 0; -} int32_t udfdOpenClientRpc() { - char *pass = "taosdata"; - char *user = "root"; - char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; - taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt); SRpcInit rpcInit = {0}; - rpcInit.label = (char *)"UDFD"; + rpcInit.label = "UDFD"; rpcInit.numOfThreads = 1; - rpcInit.cfp = udfdProcessRpcRsp; + rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = 30 * 1000; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.user = TSDB_DEFAULT_USER; + rpcInit.ckey = "key"; + rpcInit.spi = 1; rpcInit.parent = &global; + rpcInit.rfp = udfdRpcRfp; - rpcInit.user = (char *)user; - rpcInit.ckey = (char *)"key"; - rpcInit.secret = (char *)secretEncrypt; - rpcInit.spi = 1; + char pass[TSDB_PASSWORD_LEN + 1] = {0}; + taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); + rpcInit.secret = pass; global.clientRpc = rpcOpen(&rpcInit); - + if (global.clientRpc == NULL) { + fnError("failed to init dnode rpc client"); + return -1; + } return 0; } @@ -700,12 +800,6 @@ static int32_t udfdRun() { global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); - // TOOD: client rpc to fetch udf function info from mnode - if (udfdOpenClientRpc() != 0) { - fnError("open rpc connection to mnode failure"); - return -1; - } - if (udfdUvInit() != 0) { fnError("uv init failure"); return -2; @@ -717,7 +811,6 @@ static int32_t udfdRun() { int codeClose = uv_loop_close(global.loop); fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); removeListeningPipe(); - udfdCloseClientRpc(); uv_mutex_destroy(&global.udfsMutex); taosHashCleanup(global.udfsHash); return 0; @@ -746,9 +839,22 @@ int main(int argc, char *argv[]) { if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { fnError("failed to start since read config error"); - return -1; + return -2; } initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); - return udfdRun(); + if (udfdOpenClientRpc() != 0) { + fnError("open rpc connection to mnode failure"); + return -3; + } + + if (udfdConnectToMNode() != 0) { + fnError("failed to start since can not connect to mnode"); + return -4; + } + + udfdRun(); + + udfdCloseClientRpc(); + } diff --git a/source/libs/index/inc/indexComm.h b/source/libs/index/inc/indexComm.h index 043404f48f65c71924c596cd2f2bf15ccc643644..3066fd1c2c57481cc80a6b19a7dc2de1a9b4d6cc 100644 --- a/source/libs/index/inc/indexComm.h +++ b/source/libs/index/inc/indexComm.h @@ -33,11 +33,17 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond; typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); -TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b); +TExeCond tCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b, int8_t dType); +TExeCond tDoCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b); _cache_range_compare indexGetCompare(RangeType ty); int32_t indexConvertData(void* src, int8_t type, void** dst); +int32_t indexConvertDataToStr(void* src, int8_t type, void** dst); + +int32_t indexGetDataByteLen(int8_t type); + +char* indexInt2str(int64_t val, char* dst, int radix); #ifdef __cplusplus } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 46f2f7a93b6b9462e851a305cd9ae1f4b1f6c426..162d64c41c472e88aa019b4b5def7727850c636d 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -109,17 +109,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { taosThreadMutexInit(&sIdx->mtx, NULL); sIdx->refId = indexAddRef(sIdx); - taosAcquireRef(indexRefMgt, sIdx->refId); + indexAcquireRef(sIdx->refId); *index = sIdx; - return 0; END: if (sIdx != NULL) { indexClose(sIdx); } - *index = NULL; return -1; } @@ -273,7 +271,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy tm->nColName = nColName; char* buf = NULL; - int32_t len = indexConvertData((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf); + int32_t len = indexConvertDataToStr((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf); assert(len != -1); tm->colVal = buf; diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index d4231619ecfcc4ab3af728eda036e1e3e1a263a6..1d7a4a541982ca5a61f66ef9999106306907acc2 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -282,8 +282,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe if (0 != strncmp(c->colVal, pCt->colVal, skip)) { break; } + char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1); + memcpy(p, c->colVal, strlen(c->colVal)); - TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType); + TExeCond cond = cmpFn(p + skip, term->colVal, dType); if (cond == MATCH) { if (c->operaType == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) @@ -297,6 +299,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe } else if (cond == BREAK) { break; } + taosMemoryFree(p); } taosMemoryFree(pCt); @@ -463,7 +466,6 @@ int indexCacheSchedToMerge(IndexCache* pCache) { // schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t)); // memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t)); schedMsg.msg = NULL; - indexAcquireRef(pCache->index->refId); taosScheduleTask(indexQhandle, &schedMsg); diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index ac26ed1fab73fed4b961c1c394f9cfc72562a968..eea30bfb038198ba82fecc2f4635d4582d28ea66 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -19,10 +19,38 @@ #include "tcoding.h" #include "tcompare.h" #include "tdataformat.h" +#include "ttypes.h" char JSON_COLUMN[] = "JSON"; char JSON_VALUE_DELIM = '&'; +char* indexInt2str(int64_t val, char* dst, int radix) { + char buffer[65]; + char* p; + int64_t new_val; + uint64_t uval = (uint64_t)val; + + if (radix < 0) { + if (val < 0) { + *dst++ = '-'; + uval = (uint64_t)0 - uval; /* Avoid integer overflow in (-val) for LLONG_MIN (BUG#31799). */ + } + } + p = &buffer[sizeof(buffer) - 1]; + *p = '\0'; + new_val = (int64_t)(uval / 10); + *--p = '0' + (char)(uval - (uint64_t)new_val * 10); + val = new_val; + + while (val != 0) { + new_val = val / 10; + *--p = '0' + (char)(val - new_val * 10); + val = new_val; + } + while ((*dst++ = *p++) != 0) + ; + return dst - 1; +} static __compar_fn_t indexGetCompar(int8_t type) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { return (__compar_fn_t)strcmp; @@ -31,25 +59,49 @@ static __compar_fn_t indexGetCompar(int8_t type) { } static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { __compar_fn_t func = indexGetCompar(type); - return tDoCommpare(func, QUERY_LESS_THAN, a, b); + return tCompare(func, QUERY_LESS_THAN, a, b, type); } static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { __compar_fn_t func = indexGetCompar(type); - return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); + return tCompare(func, QUERY_LESS_EQUAL, a, b, type); } static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { __compar_fn_t func = indexGetCompar(type); - return tDoCommpare(func, QUERY_GREATER_THAN, a, b); + return tCompare(func, QUERY_GREATER_THAN, a, b, type); } static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { __compar_fn_t func = indexGetCompar(type); - return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); + return tCompare(func, QUERY_GREATER_EQUAL, a, b, type); } - -TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { +TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) { + if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR) { + return tDoCompare(func, cmptype, a, b); + } +#if 1 + int8_t bytes = tDataTypes[dtype].bytes; + if (bytes == 1) { + int8_t va = taosStr2int64(a); + int8_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (bytes == 2) { + int16_t va = taosStr2int64(a); + int16_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (bytes == 4) { + int32_t va = taosStr2int64(a); + int32_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else { + int64_t va = taosStr2int64(a); + int64_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } +#endif +} +TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { // optime later int32_t ret = func(a, b); - switch (comType) { + switch (comparType) { case QUERY_LESS_THAN: { if (ret < 0) return MATCH; } break; @@ -174,9 +226,9 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) { tlen = taosEncodeFixedU32(dst, *(uint32_t*)src); break; case TSDB_DATA_TYPE_BIGINT: - tlen = taosEncodeFixedI64(NULL, *(uint32_t*)src); + tlen = taosEncodeFixedI64(NULL, *(int64_t*)src); *dst = taosMemoryCalloc(1, tlen + 1); - tlen = taosEncodeFixedI64(dst, *(uint32_t*)src); + tlen = taosEncodeFixedI64(dst, *(int64_t*)src); break; case TSDB_DATA_TYPE_DOUBLE: tlen = taosEncodeBinary(NULL, src, sizeof(double)); @@ -184,9 +236,9 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) { tlen = taosEncodeBinary(dst, src, sizeof(double)); break; case TSDB_DATA_TYPE_UBIGINT: - tlen = taosEncodeFixedU64(NULL, *(uint32_t*)src); + tlen = taosEncodeFixedU64(NULL, *(uint64_t*)src); *dst = taosMemoryCalloc(1, tlen + 1); - tlen = taosEncodeFixedU64(dst, *(uint32_t*)src); + tlen = taosEncodeFixedU64(dst, *(uint64_t*)src); break; case TSDB_DATA_TYPE_NCHAR: { tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src)); @@ -215,14 +267,94 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) { break; } *dst = *dst - tlen; - if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR && type != TSDB_DATA_TYPE_VARBINARY && - type == TSDB_DATA_TYPE_VARCHAR) { - uint8_t* p = *dst; - for (int i = 0; i < tlen; i++) { - if (p[i] == 0) { - p[i] = (uint8_t)'0'; - } + // indexMayFillNumbericData(*dst, tlen); + return tlen; +} +int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) { + int tlen = tDataTypes[type].bytes; + + switch (type) { + case TSDB_DATA_TYPE_TIMESTAMP: + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(int64_t*)src, *dst, -1); + break; + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_UTINYINT: + // tlen = taosEncodeFixedU8(NULL, *(uint8_t*)src); + //*dst = taosMemoryCalloc(1, tlen + 1); + // tlen = taosEncodeFixedU8(dst, *(uint8_t*)src); + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(uint8_t*)src, *dst, 1); + break; + case TSDB_DATA_TYPE_TINYINT: + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(int8_t*)src, *dst, 1); + break; + case TSDB_DATA_TYPE_SMALLINT: + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(int16_t*)src, *dst, -1); + break; + case TSDB_DATA_TYPE_USMALLINT: + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(uint16_t*)src, *dst, -1); + break; + case TSDB_DATA_TYPE_INT: + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(int32_t*)src, *dst, -1); + break; + case TSDB_DATA_TYPE_FLOAT: + tlen = taosEncodeBinary(NULL, src, sizeof(float)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, sizeof(float)); + *dst = *dst - tlen; + break; + case TSDB_DATA_TYPE_UINT: + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(uint32_t*)src, *dst, 1); + break; + case TSDB_DATA_TYPE_BIGINT: + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(int64_t*)src, *dst, 1); + break; + case TSDB_DATA_TYPE_DOUBLE: + tlen = taosEncodeBinary(NULL, src, sizeof(double)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, sizeof(double)); + *dst = *dst - tlen; + break; + case TSDB_DATA_TYPE_UBIGINT: + assert(0); + *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + indexInt2str(*(uint64_t*)src, *dst, 1); + break; + case TSDB_DATA_TYPE_NCHAR: { + tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src)); + *dst = *dst - tlen; + + break; } + case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY +#if 1 + tlen = taosEncodeBinary(NULL, src, strlen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, strlen(src)); + *dst = *dst - tlen; + break; +#endif + } + case TSDB_DATA_TYPE_VARBINARY: +#if 1 + tlen = taosEncodeBinary(NULL, src, strlen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, strlen(src)); + *dst = *dst - tlen; + break; +#endif + default: + TASSERT(0); + break; } return tlen; } diff --git a/source/libs/index/src/indexFst.c b/source/libs/index/src/indexFst.c index bc3ecea7a5d4bc4f369ea53cf6ab168b878496ae..e2975fb7bcc9784b34391215e91bd264ea82efd6 100644 --- a/source/libs/index/src/indexFst.c +++ b/source/libs/index/src/indexFst.c @@ -1324,7 +1324,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { taosArrayPop(sws->inp); } - streamStateDestroy(p); + // streamStateDestroy(p); continue; } FstTransition trn; diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index b787da117da253b07d7dde98e1e667f3cca3695f..6c599867443ef566de7a7aaf39956c167a4e5664 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -410,8 +410,9 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); cost = taosGetTimestampUs() - et; - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, - tem->colName, tem->colVal, cost); + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64 + ", size: %d, time cost: %" PRIu64 "us", + tem->suid, tem->colName, tem->colVal, offset, (int)taosArrayGetSize(tr->total), cost); } fstSliceDestroy(&key); return 0; @@ -941,7 +942,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* // TODO(yihao): opt later WriterCtx* ctx = reader->ctx; // add block cache - char block[1024] = {0}; + char block[4096] = {0}; int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset); assert(nread >= sizeof(uint32_t)); diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index 3de7cb66f20980deef0257ab54310d51b81beee1..ff349b9b24b620c638f57787586c53a496ec444f 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -56,6 +56,29 @@ class JsonEnv : public ::testing::Test { SIndexJson* index; }; +static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtype, void* data, int dlen, int tableId, + int8_t operType = ADD_VALUE) { + SIndexTerm* term = + indexTermCreate(1, (SIndexOperOnColumn)operType, dtype, colName.c_str(), colName.size(), (const char*)data, dlen); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + tIndexJsonPut(index, terms, (int64_t)tableId); + + indexMultiTermDestroy(terms); +} +static void Search(SIndexJson* index, const std::string& colNam, int8_t dtype, void* data, int dlen, int8_t filterType, + SArray** result) { + std::string colName(colNam); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, dtype, colName.c_str(), colName.size(), (const char*)data, dlen); + + SArray* res = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, (EIndexQueryType)filterType); + tIndexJsonSearch(index, mq, res); + indexMultiTermQueryDestroy(mq); + *result = res; +} TEST_F(JsonEnv, testWrite) { { std::string colName("test"); @@ -204,9 +227,10 @@ TEST_F(JsonEnv, testWriteMillonData) { TEST_F(JsonEnv, testWriteJsonNumberData) { { std::string colName("test"); - std::string colVal("10"); + // std::string colVal("10"); + int val = 10; SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), - colVal.c_str(), colVal.size()); + (const char*)&val, sizeof(val)); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -217,9 +241,9 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { } { std::string colName("test2"); - std::string colVal("20"); + int val = 20; SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), - colVal.c_str(), colVal.size()); + (const char*)&val, sizeof(val)); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -229,10 +253,10 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { indexMultiTermDestroy(terms); } { - std::string colName("test2"); - std::string colVal("15"); + std::string colName("test"); + int val = 15; SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), - colVal.c_str(), colVal.size()); + (const char*)&val, sizeof(val)); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -243,9 +267,9 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { } { std::string colName("test2"); - std::string colVal("15"); + const char* val = "test"; SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), - colVal.c_str(), colVal.size()); + (const char*)val, strlen(val)); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -255,12 +279,11 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { indexMultiTermDestroy(terms); } { - std::string colName("test"); - std::string colVal("10"); - + std::string colName("test"); + int val = 15; SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_TERM); @@ -270,11 +293,11 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { } { std::string colName("test"); - std::string colVal("10"); + int val = 15; SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); @@ -284,11 +307,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { } { std::string colName("test"); - std::string colVal("10"); + int val = 10; + ; SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(int)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL); @@ -298,11 +322,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { } { std::string colName("test"); - std::string colVal("10"); + int val = 10; + // std::string colVal("10"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN); @@ -312,11 +337,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { } { std::string colName("test"); - std::string colVal("10"); + int val = 10; + // std::string colVal("10"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL); @@ -326,12 +352,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { } } -TEST_F(JsonEnv, testWriteJsonTfileAndCache) { +TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) { { std::string colName("test1"); - std::string colVal("10"); + int val = 10; SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), - colVal.c_str(), colVal.size()); + (const char*)&val, sizeof(val)); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -355,11 +381,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { } { std::string colName("test1"); - std::string colVal("10"); + int val = 10; SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_TERM); @@ -369,11 +395,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { } { std::string colName("test1"); - std::string colVal("10"); + int val = 10; SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(int)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); @@ -383,11 +409,12 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { } { std::string colName("test1"); - std::string colVal("10"); + // std::string colVal("10"); + int val = 10; SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL); @@ -397,11 +424,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { } { std::string colName("test1"); - std::string colVal("10"); + int val = 10; SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); @@ -411,11 +438,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { } { std::string colName("test1"); - std::string colVal("10"); + int val = 10; SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL); @@ -425,9 +452,10 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { } { std::string colName("other_column"); - std::string colVal("100"); + int val = 100; + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), - colVal.c_str(), colVal.size()); + (const char*)&val, sizeof(val)); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -438,11 +466,12 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { } { std::string colName("test1"); - std::string colVal("10"); + int val = 10; + // std::string colVal("10"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), - colVal.size()); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); SArray* result = taosArrayInit(1, sizeof(uint64_t)); indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN); @@ -450,4 +479,102 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { EXPECT_EQ(0, taosArrayGetSize(result)); indexMultiTermQueryDestroy(mq); } + { + std::string colName("test1"); + int val = 15; + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i + 1000); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("test1"); + int val = 8; + // std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + (const char*)&val, sizeof(val)); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(2000, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } +} +TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT2) { + { + int val = 10; + std::string colName("test1"); + for (int i = 0; i < 10000; i++) { + val += 1; + WriteData(index, colName, TSDB_DATA_TYPE_INT, &val, sizeof(val), i); + } + } + { + int val = 10; + std::string colName("test2xxx"); + std::string colVal("xxxxxxxxxxxxxxx"); + for (int i = 0; i < 100000; i++) { + val += 1; + WriteData(index, colName, TSDB_DATA_TYPE_BINARY, (void*)(colVal.c_str()), colVal.size(), i); + } + } + { + SArray* res = NULL; + std::string colName("test1"); + int val = 9; + Search(index, colName, TSDB_DATA_TYPE_INT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res); + EXPECT_EQ(10000, taosArrayGetSize(res)); + } + { + SArray* res = NULL; + std::string colName("test2xxx"); + std::string colVal("xxxxxxxxxxxxxxx"); + Search(index, colName, TSDB_DATA_TYPE_BINARY, (void*)(colVal.c_str()), colVal.size(), QUERY_TERM, &res); + EXPECT_EQ(100000, taosArrayGetSize(res)); + } +} +TEST_F(JsonEnv, testWriteJsonTfileAndCache_FLOAT) { + { + float val = 10.0; + std::string colName("test1"); + for (int i = 0; i < 1000; i++) { + WriteData(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), i); + } + } + { + float val = 2.0; + std::string colName("test1"); + for (int i = 0; i < 1000; i++) { + WriteData(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), i); + } + } + { + SArray* res = NULL; + std::string colName("test1"); + float val = 1.9; + Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res); + EXPECT_EQ(2000, taosArrayGetSize(res)); + } + { + SArray* res = NULL; + std::string colName("test1"); + float val = 2.1; + Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res); + EXPECT_EQ(1000, taosArrayGetSize(res)); + } + { + std::string colName("test1"); + SArray* res = NULL; + float val = 2.1; + Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res); + EXPECT_EQ(1000, taosArrayGetSize(res)); + } } diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc index 30f1a089f7cb200713d32a4d1efc0d10dbfd402e..18a2b457c41c2cd66f20a01f3690d0af4fe69d3d 100644 --- a/source/libs/index/test/utilUT.cc +++ b/source/libs/index/test/utilUT.cc @@ -6,12 +6,14 @@ #include #include "index.h" #include "indexCache.h" +#include "indexComm.h" #include "indexFst.h" #include "indexFstCountingWriter.h" #include "indexFstUtil.h" #include "indexInt.h" #include "indexTfile.h" #include "indexUtil.h" +#include "tcoding.h" #include "tglobal.h" #include "tskiplist.h" #include "tutil.h" @@ -305,3 +307,17 @@ TEST_F(UtilEnv, 01Except) { ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1); ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100); } +TEST_F(UtilEnv, testFill) { + for (int i = 0; i < 10000000; i++) { + int64_t val = i; + char buf[65] = {0}; + indexInt2str(val, buf, 1); + EXPECT_EQ(val, taosStr2int64(buf)); + } + for (int i = 0; i < 10000000; i++) { + int64_t val = 0 - i; + char buf[65] = {0}; + indexInt2str(val, buf, -1); + EXPECT_EQ(val, taosStr2int64(buf)); + } +} diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index b02ca79ed46451a6dcaf6c45ef81598715f749fa..24ddcc1b75c70865f334f553a6b0f1ee176d62ca 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1 print ========= start dnode1 as LEADER system sh/exec.sh -n dnode1 -s start -sleep 2000 +sleep 1000 sql connect print ======== step1 udf diff --git a/tests/script/tsim/sync/threeReplica1VgElect.sim b/tests/script/tsim/sync/threeReplica1VgElect.sim index 7f8c8339cbb44de39cb0da9dca12c19159209d3f..1496d7c778b475895479eb3661fae7ad86a121d3 100644 --- a/tests/script/tsim/sync/threeReplica1VgElect.sim +++ b/tests/script/tsim/sync/threeReplica1VgElect.sim @@ -220,7 +220,6 @@ if $data[0][4] == LEADER then print ---- vgroup $data[0][0] leader switch to dnode $data[0][3] elif $data[0][6] == LEADER then print ---- vgroup $data[0][0] leader switch to dnode $data[0][5] -endi elif $data[0][8] == LEADER then print ---- vgroup $data[0][0] leader switch to dnode $data[0][7] else @@ -342,7 +341,6 @@ elif $data[0][6] == LEADER then goto check_vg_ready_3 endi print ---- vgroup $data[0][0] leader locating dnode $data[0][7] -endi elif $data[0][8] == LEADER then if $data[0][4] == LEADER then goto check_vg_ready_3 diff --git a/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim b/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim index 1e12e8565feedc200151b64d37bc6663b4e09b53..2dd9b4ed80f3ea4e3c2d4b6f7264d84f68c2d731 100644 --- a/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim +++ b/tests/script/tsim/sync/threeReplica1VgElectWihtInsert.sim @@ -420,7 +420,6 @@ elif $data[0][6] == LEADER then goto check_vg_ready_3 endi print ---- vgroup $data[0][0] leader locating dnode $data[0][7] -endi elif $data[0][8] == LEADER then if $data[0][4] == LEADER then goto check_vg_ready_3 diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index 8f5a7937149010e8862637e9396a285f1ccedc66..ec4c9e9d9b0ebae8ae6f5aa474bc29c24f43260b 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -617,7 +617,7 @@ class TDTestCase: tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ - 'dbName': 'db60', \ + 'dbName': 'db70', \ 'vgroups': 4, \ 'stbName': 'stb', \ 'ctbNum': 10, \ @@ -634,7 +634,7 @@ class TDTestCase: prepareEnvThread.start() parameterDict2 = {'cfg': '', \ - 'dbName': 'db61', \ + 'dbName': 'db71', \ 'vgroups': 4, \ 'stbName': 'stb2', \ 'ctbNum': 10, \ @@ -708,9 +708,9 @@ class TDTestCase: cfgPath = buildPath + "/../sim/psim/cfg" tdLog.info("cfgPath: %s" % cfgPath) - self.tmqCase1(cfgPath, buildPath) - self.tmqCase2(cfgPath, buildPath) - self.tmqCase3(cfgPath, buildPath) + #self.tmqCase1(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) + #self.tmqCase3(cfgPath, buildPath) self.tmqCase4(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath) self.tmqCase6(cfgPath, buildPath) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 93556186e6d8aad68cd1974ac754f03af1ea5518..1cb123a52f83edae3d60eae961e08cf4c505014f 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -57,3 +57,5 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py python3 ./test.py -f 2-query/nestedQuery.py python3 ./test.py -f 7-tmq/basic5.py +python3 ./test.py -f 7-tmq/subscribeDb.py +