diff --git a/Jenkinsfile2 b/Jenkinsfile2 index a2043797c09b871e174c3c05d80a689238b85919..441cf896267bed8cd67637c1ea328e884bf0012f 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -88,12 +88,6 @@ def pre_test(){ cmake .. > /dev/null make -j4> /dev/null ''' - sh''' - cd ${WKPY} - git reset --hard - git pull - pip3 install . - ''' return 1 } @@ -103,7 +97,6 @@ pipeline { environment{ WK = '/var/lib/jenkins/workspace/TDinternal' WKC= '/var/lib/jenkins/workspace/TDengine' - WKPY= '/var/lib/jenkins/workspace/taos-connector-python' } stages { stage('pre_build'){ @@ -124,11 +117,6 @@ pipeline { ./test-all.sh b1fq ''' sh''' - export LD_LIBRARY_PATH=${WKC}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' - sh''' cd ${WKC}/debug ctest ''' diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 38308a12936f4787a636d077595bdd45e6206267..807a44a532e3e4d9263a2b74fca0d60fa87ecb93 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -984,7 +984,6 @@ int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); void tFreeSShowRsp(SShowRsp* pRsp); typedef struct { - int32_t type; char db[TSDB_DB_FNAME_LEN]; char tb[TSDB_TABLE_NAME_LEN]; int64_t showId; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index eafe64c2949979a419a491e5baeebbf3e00ea834..68fc3be617d9292f5a2f618af6eb4e52b2ec7aba 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -166,6 +166,7 @@ typedef struct SInputColumnInfoData { SColumnInfoData *pPTS; // primary timestamp column SColumnInfoData **pData; SColumnDataAgg **pColumnDataAgg; + uint64_t uid; // table uid } SInputColumnInfoData; // sql function runtime context @@ -191,7 +192,7 @@ typedef struct SqlFunctionCtx { int16_t functionId; // function id char * pOutput; // final result output buffer, point to sdata->data int32_t numOfParams; - SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param + SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param int64_t *ptsList; // corresponding timestamp array list SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ int32_t offset; diff --git a/include/util/talgo.h b/include/util/talgo.h index ebddce62a84fce6d936e62fa8b925a1216154a66..3ce26526087bfe393234aad696601839280fc479 100644 --- a/include/util/talgo.h +++ b/include/util/talgo.h @@ -82,7 +82,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size * @return */ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar, - __ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot); + __ext_compar_fn_t compar, char* buf, bool maxroot); /** * sort heap to make sure it is a max/min root heap @@ -98,7 +98,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const * @return */ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar, - const void *parswap, __ext_swap_fn_t swap, bool maxroot); + bool maxroot); #ifdef __cplusplus } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 53b0fec8ff7fdb7f8288ecdd47e0751c3d7cde70..8e8c62cd946562d26ef5d3062683fc2f9c5bdc10 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "select count(*) from tu"); + pRes = taos_query(pConn, "select now() from m1"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3840fa219b576a6cc15d3be0629bce8cac649605..c6bef560a9d82016a1439d04afac265ed52b66fe 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2441,7 +2441,6 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI64(&encoder, pReq->showId) < 0) return -1; - if (tEncodeI32(&encoder, pReq->type) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1; tEndEncode(&encoder); @@ -2457,7 +2456,6 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->type) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mgmt/test/sut/src/sut.cpp b/source/dnode/mgmt/test/sut/src/sut.cpp index 9b4c4798653858ad22beb7ed637da3b6352674f4..cc32f047afcd1baf9e3d0857fe68248187a29e0e 100644 --- a/source/dnode/mgmt/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/test/sut/src/sut.cpp @@ -89,7 +89,6 @@ int32_t Testbase::SendShowReq(int8_t showType, const char *tb, const char* db) { } SRetrieveTableReq retrieveReq = {0}; - retrieveReq.type = showType; strcpy(retrieveReq.db, db); strcpy(retrieveReq.tb, tb); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 984e9f917fafcad8c802d8f0211fe243f99d0f69..59a00f0eaf3c012ea62ce1259ab916ddd5929351 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -391,7 +391,6 @@ typedef struct { int16_t numOfColumns; int32_t rowSize; int32_t numOfRows; - int32_t payloadLen; void* pIter; SMnode* pMnode; STableMetaRsp* pMeta; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 1a14c94640f5dd8b690f4d4d3506f6baaee16f49..f81179201c405c122f41715dfab3e690670c5afc 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -18,7 +18,7 @@ #define SHOW_STEP_SIZE 100 -static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq); +static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq); static void mndFreeShowObj(SShowObj *pShow); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); @@ -47,18 +47,80 @@ void mndCleanupShow(SMnode *pMnode) { } } -static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq) { +static int32_t convertToRetrieveType(char* name, int32_t len) { + int32_t type = -1; + + if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) { + type = TSDB_MGMT_TABLE_DNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, len) == 0) { + type = TSDB_MGMT_TABLE_MNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, len) == 0) { + type = TSDB_MGMT_TABLE_MODULE; + } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, len) == 0) { + type = TSDB_MGMT_TABLE_QNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, len) == 0) { + type = TSDB_MGMT_TABLE_BNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) { + type = TSDB_MGMT_TABLE_SNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) { + type = TSDB_MGMT_TABLE_CLUSTER; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, len) == 0) { + type = TSDB_MGMT_TABLE_DB; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) { + type = TSDB_MGMT_TABLE_FUNC; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) { + // type = TSDB_MGMT_TABLE_INDEX; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) { + type = TSDB_MGMT_TABLE_STB; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, len) == 0) { + type = TSDB_MGMT_TABLE_STREAMS; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) { + type = TSDB_MGMT_TABLE_TABLE; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) { + // type = TSDB_MGMT_TABLE_DIST; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, len) == 0) { + type = TSDB_MGMT_TABLE_USER; + } else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, len) == 0) { + type = TSDB_MGMT_TABLE_GRANTS; + } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, len) == 0) { + type = TSDB_MGMT_TABLE_VGROUP; + } else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, len) == 0) { + type = TSDB_MGMT_TABLE_TOPICS; + } else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, len) == 0) { + type = TSDB_MGMT_TABLE_CONSUMERS; + } else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, len) == 0) { + type = TSDB_MGMT_TABLE_SUBSCRIBES; + } else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, len) == 0) { + type = TSDB_MGMT_TABLE_TRANS; + } else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, len) == 0) { + type = TSDB_MGMT_TABLE_SMAS; + } else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, len) == 0) { + type = TSDB_MGMT_TABLE_CONFIGS; + } else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, len) == 0) { + type = TSDB_MGMT_TABLE_CONNS; + } else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, len) == 0) { + type = TSDB_MGMT_TABLE_QUERIES; + } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) { + type = TSDB_MGMT_TABLE_VNODES; + } else { +// ASSERT(0); + } + + return type; +} + +static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) { SShowMgmt *pMgmt = &pMnode->showMgmt; int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1); if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1); - int32_t size = sizeof(SShowObj) + pReq->payloadLen; + int32_t size = sizeof(SShowObj); + SShowObj showObj = {0}; - showObj.id = showId; + showObj.id = showId; showObj.pMnode = pMnode; - showObj.type = pReq->type; - showObj.payloadLen = pReq->payloadLen; + showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); int32_t keepTime = tsShellActivityTimer * 6 * 1000; @@ -127,10 +189,6 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { } if (retrieveReq.showId == 0) { - SShowReq req = {0}; - req.type = retrieveReq.type; - strncpy(req.db, retrieveReq.db, tListLen(req.db)); - STableMetaRsp *pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb) + 1); if (pMeta == NULL) { terrno = TSDB_CODE_MND_INVALID_INFOS_TBL; @@ -138,7 +196,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { return -1; } - pShow = mndCreateShowObj(pMnode, &req); + pShow = mndCreateShowObj(pMnode, &retrieveReq); if (pShow == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to process show-meta req since %s", terrstr()); diff --git a/source/dnode/mnode/impl/test/show/show.cpp b/source/dnode/mnode/impl/test/show/show.cpp index 2b0eaa0becba0696acd0e516b8a904be4abe73a0..5c431f65d3a5eb5663eb3a92005a4635e6f7f384 100644 --- a/source/dnode/mnode/impl/test/show/show.cpp +++ b/source/dnode/mnode/impl/test/show/show.cpp @@ -36,7 +36,7 @@ TEST_F(MndTestShow, 01_ShowMsg_InvalidMsgMax) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_INVALID_MSG); + ASSERT_NE(pRsp->code, 0); } TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { @@ -50,7 +50,7 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_INVALID_MSG); + ASSERT_NE(pRsp->code, 0); } TEST_F(MndTestShow, 03_ShowMsg_Conn) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9e48e12cb5187072a99cfe20f1b7f17c7cbc02d9..5adfb7caca4a3f633ac8b3858581a4e9760fe1bf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -382,7 +382,7 @@ typedef struct SSysTableScanInfo { void* pCur; // cursor for iterate the local table meta store. SArray* scanCols; // SArray scan column id list - int32_t type; // show type, TODO remove it +// int32_t type; // show type, TODO remove it SName name; SSDataBlock* pRes; int32_t capacity; @@ -479,7 +479,7 @@ typedef struct { typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; - SArray* pGroupCols; + SArray* pGroupCols; // group by columns, SArray SArray* pGroupColVals; // current group column values, SArray SNode* pCondition; bool isInit; // denote if current val is initialized or not @@ -600,7 +600,6 @@ typedef struct SJoinOperatorInfo { int32_t rightPos; SColumnInfo rightCol; SNode *pOnCondition; -// SRspResultInfo resultInfo; } SJoinOperatorInfo; int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7791a345ed7f34626c6313d56cfc7a160dcba28c..7a7cb68424ff4b74a5596b2dfbdc83cd00bf4493 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1087,6 +1087,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pCtx[i].currentStage = MAIN_SCAN; SInputColumnInfoData* pInput = &pCtx[i].input; + pInput->uid = pBlock->info.uid; SExprInfo* pOneExpr = &pOperator->pExpr[i]; for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { @@ -1101,7 +1102,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { - if (createDummyCol) { + // todo avoid case: top(k, 12), 12 is the value parameter. + // sum(11), 11 is also the value parameter. + if (createDummyCol && pOneExpr->base.numOfParams == 1) { code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1876,67 +1879,58 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, } } pCtx->resDataInfo.interBufSize = env.calcMemSize; - } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { - pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; // for simple column, the intermediate buffer needs to hold one element. + } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || + pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { + // for simple column, the intermediate buffer needs to hold one element. + pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; } pCtx->input.numOfInputCols = pFunct->numOfParams; pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); - pCtx->pTsOutput = NULL; + pCtx->pTsOutput = NULL; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; - pCtx->resDataInfo.type = pFunct->resSchema.type; - pCtx->order = TSDB_ORDER_ASC; - pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; -#if 0 - for (int32_t j = 0; j < pCtx->numOfParams; ++j) { -// int16_t type = pFunct->param[j].nType; -// int16_t bytes = pFunct->param[j].nLen; - -// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { -// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type); -// } else { -// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type); -// } - } - - // set the order information for top/bottom query - int32_t functionId = pCtx->functionId; - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { - int32_t f = getExprFunctionId(&pExpr[0]); - assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); - -// pCtx->param[2].i = pQueryAttr->order.order; - pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[3].i = functionId; - pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; - -// pCtx->param[1].i = pQueryAttr->order.col.info.colId; - } else if (functionId == FUNCTION_INTERP) { -// pCtx->param[2].i = (int8_t)pQueryAttr->fillType; -// if (pQueryAttr->fillVal != NULL) { -// if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { -// pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; -// } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value -// if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { -// taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType); -// } -// } + pCtx->resDataInfo.type = pFunct->resSchema.type; + pCtx->order = TSDB_ORDER_ASC; + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; + pCtx->numOfParams = pExpr->base.numOfParams; + + pCtx->param = pFunct->pParam; +// for (int32_t j = 0; j < pCtx->numOfParams; ++j) { +// // set the order information for top/bottom query +// int32_t functionId = pCtx->functionId; +// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { +// int32_t f = getExprFunctionId(&pExpr[0]); +// assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); +// +// // pCtx->param[2].i = pQueryAttr->order.order; +// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; +// // pCtx->param[3].i = functionId; +// // pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; +// +// // pCtx->param[1].i = pQueryAttr->order.col.info.colId; +// } else if (functionId == FUNCTION_INTERP) { +// // pCtx->param[2].i = (int8_t)pQueryAttr->fillType; +// // if (pQueryAttr->fillVal != NULL) { +// // if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { +// // pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; +// // } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value +// // if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { +// // taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType); +// // } +// // } +// // } +// } else if (functionId == FUNCTION_TWA) { +// // pCtx->param[1].i = pQueryAttr->window.skey; +// // pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; +// // pCtx->param[2].i = pQueryAttr->window.ekey; +// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; +// } else if (functionId == FUNCTION_ARITHM) { +// // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); // } - } else if (functionId == FUNCTION_TS_COMP) { -// pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client - pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT; - } else if (functionId == FUNCTION_TWA) { -// pCtx->param[1].i = pQueryAttr->window.skey; - pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; -// pCtx->param[2].i = pQueryAttr->window.ekey; - pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - } else if (functionId == FUNCTION_ARITHM) { -// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); - } -#endif +// } } for (int32_t i = 1; i < numOfOutput; ++i) { @@ -1955,7 +1949,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) { - taosVariantDestroy(&pCtx[i].param[j]); + taosVariantDestroy(&pCtx[i].param[j].param); } taosVariantDestroy(&pCtx[i].tag); @@ -6487,9 +6481,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); - if (pDataReader == NULL) { - return NULL; - } SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b054bbfcb6eb77dd250510b82843e998c8bdb765..ce3de02c9fb4685f628374ee53c5f0baf096b9b5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -319,7 +319,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // The read handle is not initialized yet, since no qualified tables exists - if (pTableScanInfo->dataReader == NULL) { + if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -375,10 +375,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { return p; } -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, - int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, - SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, - SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag, + int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, + SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); @@ -391,19 +390,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, return NULL; } - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; - pInfo->dataBlockLoadFlag = dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pTsdbReadHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->order = order; - pInfo->current = 0; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; + pInfo->interval = *pInterval; + pInfo->sampleRatio = sampleRatio; + pInfo->dataBlockLoadFlag= dataLoadFlag; + pInfo->pResBlock = pResBlock; + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pDataReader; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; + pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; @@ -677,7 +676,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { tsem_destroy(&pInfo->ready); blockDataDestroy(pInfo->pRes); - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { metaCloseTbCursor(pInfo->pCur); } } @@ -812,7 +812,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { SSysTableScanInfo* pInfo = pOperator->info; // retrieve local table list info from vnode - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (pInfo->pCur == NULL) { pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); } @@ -864,8 +865,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { while (1) { int64_t startTs = taosGetTimestampUs(); - - pInfo->req.type = pInfo->type; strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); if (pInfo->showRewrite) { @@ -947,68 +946,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pInfo->pCondition = pCondition; pInfo->scanCols = colList; - // TODO remove it - int32_t tableType = 0; - const char* name = tNameGetTableName(pName); - if (strncasecmp(name, TSDB_INS_TABLE_DNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_DNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_MNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_MODULE; - } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_QNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_BNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_SNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_CLUSTER; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_DB; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_FUNC; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, tListLen(pName->tname)) == 0) { - // tableType = TSDB_MGMT_TABLE_INDEX; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_STB; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_STREAMS; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_TABLE; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, tListLen(pName->tname)) == 0) { - // tableType = TSDB_MGMT_TABLE_DIST; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_USER; - } else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_GRANTS; - } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_VGROUP; - } else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_TOPICS; - } else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_CONSUMERS; - } else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_SUBSCRIBES; - } else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_TRANS; - } else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_SMAS; - } else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_CONFIGS; - } else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_CONNS; - } else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_QUERIES; - } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_VNODES; - } else { - ASSERT(0); - } - tNameAssign(&pInfo->name, pName); - pInfo->type = tableType; - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { pInfo->readHandle = pSysTableReadHandle; blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity); } else { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 11c89f15685b2e361b35171e2881dbe292357534..8473138e4af4fd5f075a2bb7a7ff75087b75d90b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -58,6 +58,9 @@ bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx); +bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); +int32_t topFunction(SqlFunctionCtx *pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index ce7384cbbe34e9c93b8fc35689c04915cc0bbf68..efd8de84c40b233f8a3a18f144b5112ec5993a2b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -193,8 +193,15 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return TSDB_CODE_SUCCESS; } +static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + // pseudo column do not need to check parameters + pFunc->node.resType = (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - // todo + SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; + pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; return TSDB_CODE_SUCCESS; } @@ -497,9 +504,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_TOP, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateTop, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, - .processFunc = maxFunction, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = functionSetup, + .processFunc = topFunction, .finalizeFunc = functionFinalize }, { @@ -876,7 +883,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "tbname", .type = FUNCTION_TYPE_TBNAME, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, - .translateFunc = NULL, + .translateFunc = translateTbnameColumn, .getEnvFunc = NULL, .initFunc = NULL, .sprocessFunc = NULL, @@ -914,7 +921,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { }, { .name = "_wendts", - .type = FUNCTION_TYPE_QENDTS, + .type = FUNCTION_TYPE_WENDTS, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .translateFunc = translateTimePseudoColumn, .getEnvFunc = getTimePseudoFuncEnv, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3d796515a0ec937b65885d211619130604adf7b1..09427265889f921b7c023738dde788b9669495a4 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -14,10 +14,11 @@ */ #include "builtinsimpl.h" -#include "tpercentile.h" +#include #include "querynodes.h" #include "taggfunction.h" #include "tdatablock.h" +#include "tpercentile.h" #define SET_VAL(_info, numOfElem, res) \ do { \ @@ -472,17 +473,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -typedef struct STopBotRes { - int32_t num; -} STopBotRes; - -bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0); - int32_t bytes = pColNode->node.resType.bytes; - SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); - return true; -} - typedef struct SStddevRes { double result; int64_t count; @@ -523,7 +513,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { switch (type) { case TSDB_DATA_TYPE_TINYINT: { int8_t* plist = (int8_t*)pCol->pData; - for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + for (int32_t i = start; i < numOfRows + start; ++i) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { continue; } @@ -749,9 +739,9 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -// TODO set the correct parameter. void percentileFinalize(SqlFunctionCtx* pCtx) { - double v = 50;//pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey; + SVariant* pVal = &pCtx->param[1].param; + double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo); @@ -1173,3 +1163,130 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } } +typedef struct STopBotResItem { + SVariant v; + uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data + struct { + int32_t pageId; + int32_t offset; + } tuplePos; // tuple data of this chosen row +} STopBotResItem; + +typedef struct STopBotRes { + int32_t num; + STopBotResItem *pItems; +} STopBotRes; + +bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0); + int32_t bytes = pColNode->node.resType.bytes; + SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); + + pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * bytes; + return true; +} + +static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); + pRes->pItems = (STopBotResItem*)((char*) pRes + sizeof(STopBotRes)); + + return pRes; +} + +static void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid); + +int32_t topFunction(SqlFunctionCtx *pCtx) { + int32_t numOfElems = 0; + + STopBotRes *pRes = getTopBotOutputInfo(pCtx); + assert(pRes->num >= 0); + +// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) { +// buildTopBotStruct(pRes, pCtx); +// } + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t type = pInput->pData[0]->info.type; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + for (int32_t i = start; i < numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + numOfElems++; + + char* data = colDataGetData(pCol, i); + doAddIntoResult(pRes, pCtx->param[1].param.i, data, type, pInput->uid); + } + + // treat the result as only one result + SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); + return TSDB_CODE_SUCCESS; +} + +static int32_t topBotResComparFn(const void *p1, const void *p2, const void *param) { + uint16_t type = *(uint16_t *) param; + + STopBotResItem *val1 = (STopBotResItem *) p1; + STopBotResItem *val2 = (STopBotResItem *) p2; + + if (IS_SIGNED_NUMERIC_TYPE(type)) { + if (val1->v.i == val2->v.i) { + return 0; + } + + return (val1->v.i > val2->v.i) ? 1 : -1; + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + if (val1->v.u == val2->v.u) { + return 0; + } + + return (val1->v.u > val2->v.u) ? 1 : -1; + } + + if (val1->v.d == val2->v.d) { + return 0; + } + + return (val1->v.d > val2->v.d) ? 1 : -1; +} + +void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid) { + SVariant val = {0}; + taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); + + STopBotResItem *pItems = pRes->pItems; + assert(pItems != NULL); + + // not full yet + if (pRes->num < maxSize) { + STopBotResItem* pItem = &pItems[pRes->num]; + pItem->v = val; + pItem->uid = uid; + pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer + + pRes->num++; + taosheapsort((void *) pItem, sizeof(STopBotResItem), pRes->num, (const void *) &type, topBotResComparFn, false); + } else { // replace the minimum value in the result + if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || + (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || + (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) { + STopBotResItem* pItem = &pItems[pRes->num]; + pItem->v = val; + pItem->uid = uid; + pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer + + taosheapadjust((void *) pItem, sizeof(STopBotResItem), 0, pRes->num - 1, (const void *) &type, topBotResComparFn, NULL, false); + } + } +} + +void topBotFinalize(SqlFunctionCtx* pCtx) { + functionFinalize(pCtx); + +} \ No newline at end of file diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 56ee9bc9ae4177584de37cf86abb9240c66ffa56..c26342bfa878fb7fbb092c5e913964a4c96b8615 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -765,9 +765,9 @@ static int32_t firstFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t c } static int32_t lastFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - if (pCtx->order != pCtx->param[0].i) { - return BLK_DATA_NOT_LOAD; - } +// if (pCtx->order != pCtx->param[0].param.i) { +// return BLK_DATA_NOT_LOAD; +// } if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { return BLK_DATA_DATA_LOAD; @@ -797,9 +797,9 @@ static int32_t firstDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32 } static int32_t lastDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - if (pCtx->order != pCtx->param[0].i) { - return BLK_DATA_NOT_LOAD; - } +// if (pCtx->order != pCtx->param[0].param.i) { +// return BLK_DATA_NOT_LOAD; +// } // not initialized yet, it is the first block, load it. if (pCtx->pOutput == NULL) { @@ -1261,128 +1261,6 @@ int32_t tsCompare(const void* p1, const void* p2) { } } -static void stddev_dst_function(SqlFunctionCtx *pCtx) { - SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - // the second stage to calculate standard deviation - double *retVal = &pStd->res; - - // all data are null, no need to proceed - SArray* resList = (SArray*) pCtx->param[0].pz; - if (resList == NULL) { - return; - } - - // find the correct group average results according to the tag value - int32_t len = (int32_t) taosArrayGetSize(resList); - assert(len > 0); - - double avg = 0; - if (len == 1) { - SResPair* p = taosArrayGet(resList, 0); - avg = p->avg; - } else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result - SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare); - if (p == NULL) { - return; - } - - avg = p->avg; - } - - void *pData = GET_INPUT_DATA_LIST(pCtx); - int32_t num = 0; - - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: { - for (int32_t i = 0; i < pCtx->size; ++i) { - if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) { - continue; - } - num += 1; - *retVal += TPOW2(((int32_t *)pData)[i] - avg); - } - break; - } - case TSDB_DATA_TYPE_FLOAT: { - LOOP_STDDEV_IMPL(float, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - LOOP_STDDEV_IMPL(double, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_TINYINT: { - LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UTINYINT: { - LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_SMALLINT: { - LOOP_STDDEV_IMPL(int16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_USMALLINT: { - LOOP_STDDEV_IMPL(uint16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UINT: { - LOOP_STDDEV_IMPL(uint32_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - LOOP_STDDEV_IMPL(int64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UBIGINT: { - LOOP_STDDEV_IMPL(uint64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - default: - assert(0); -// qError("stddev function not support data type:%d", pCtx->inputType); - } - - pStd->num += num; - SET_VAL(pCtx, num, 1); - - // copy to the final output buffer for super table - memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo)); -} - -static void stddev_dst_merge(SqlFunctionCtx *pCtx) { - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo); - - char *input = GET_INPUT_DATA_LIST(pCtx); - - for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { - SStddevdstInfo *pInput = (SStddevdstInfo *)input; - if (pInput->num == 0) { // current input is null - continue; - } - - pRes->num += pInput->num; - pRes->res += pInput->res; - } -} - -static void stddev_dst_finalizer(SqlFunctionCtx *pCtx) { - SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - if (pStd->num <= 0) { - setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); - } else { - double *retValue = (double *)pCtx->pOutput; - SET_DOUBLE_VAL(retValue, sqrt(pStd->res / pStd->num)); - SET_VAL(pCtx, 1, 1); - } - - doFinalizer(pCtx); -} - ////////////////////////////////////////////////////////////////////////////////////// static bool first_last_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { if (!function_setup(pCtx, pResInfo)) { @@ -1390,8 +1268,8 @@ static bool first_last_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* } // used to keep the timestamp for comparison - pCtx->param[1].nType = 0; - pCtx->param[1].i = 0; +// pCtx->param[1].param.nType = 0; +// pCtx->param[1].param.i = 0; return true; } @@ -1487,13 +1365,13 @@ static void first_dist_func_merge(SqlFunctionCtx *pCtx) { } // The param[1] is used to keep the initial value of max ts value - if (pCtx->param[1].nType != pCtx->resDataInfo.type || pCtx->param[1].i > pInput->ts) { - memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes); - pCtx->param[1].i = pInput->ts; - pCtx->param[1].nType = pCtx->resDataInfo.type; - -// DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); - } +// if (pCtx->param[1].param.nType != pCtx->resDataInfo.type || pCtx->param[1].param.i > pInput->ts) { +// memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes); +// pCtx->param[1].param.i = pInput->ts; +// pCtx->param[1].param.nType = pCtx->resDataInfo.type; +// +//// DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); +// } SET_VAL(pCtx, 1, 1); // GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; @@ -1508,9 +1386,9 @@ static void first_dist_func_merge(SqlFunctionCtx *pCtx) { * least one data in this block that is not null.(TODO opt for this case) */ static void last_function(SqlFunctionCtx *pCtx) { - if (pCtx->order != pCtx->param[0].i) { - return; - } +// if (pCtx->order != pCtx->param[0].param.i) { +// return; +// } SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -1582,9 +1460,9 @@ static void last_dist_function(SqlFunctionCtx *pCtx) { * 1. for scan data is not the required order * 2. for data blocks that are not loaded, no need to check data */ - if (pCtx->order != pCtx->param[0].i) { - return; - } +// if (pCtx->order != pCtx->param[0].param.i) { +// return; +// } int32_t notNullElems = 0; for (int32_t i = pCtx->size - 1; i >= 0; --i) { @@ -1624,10 +1502,10 @@ static void last_dist_func_merge(SqlFunctionCtx *pCtx) { * param[1] used to keep the corresponding timestamp to decide if current result is * the true last result */ - if (pCtx->param[1].nType != pCtx->resDataInfo.type || pCtx->param[1].i < pInput->ts) { + if (pCtx->param[1].param.nType != pCtx->resDataInfo.type || pCtx->param[1].param.i < pInput->ts) { memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes); - pCtx->param[1].i = pInput->ts; - pCtx->param[1].nType = pCtx->resDataInfo.type; + pCtx->param[1].param.i = pInput->ts; + pCtx->param[1].param.nType = pCtx->resDataInfo.type; // DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); } @@ -1955,11 +1833,11 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SqlFunctionCtx *pCtx) { char *tmp = (char *)pTopBotInfo + sizeof(STopBotInfo); pTopBotInfo->res = (tValuePair**) tmp; - tmp += POINTER_BYTES * pCtx->param[0].i; +// tmp += POINTER_BYTES * pCtx->param[0].param.i; // size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; -// for (int32_t i = 0; i < pCtx->param[0].i; ++i) { +// for (int32_t i = 0; i < pCtx->param[0].param.i; ++i) { // pTopBotInfo->res[i] = (tValuePair*) tmp; // pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair); // tmp += size; @@ -1975,13 +1853,13 @@ bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const cha STopBotInfo *pTopBotInfo = getTopBotOutputInfo(pCtx); // required number of results are not reached, continue load data block - if (pTopBotInfo->num < pCtx->param[0].i) { - return true; - } +// if (pTopBotInfo->num < pCtx->param[0].param.i) { +// return true; +// } - if ((void *)pTopBotInfo->res[0] != (void *)((char *)pTopBotInfo + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i)) { - buildTopBotStruct(pTopBotInfo, pCtx); - } +// if ((void *)pTopBotInfo->res[0] != (void *)((char *)pTopBotInfo + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param.i)) { +// buildTopBotStruct(pTopBotInfo, pCtx); +// } tValuePair **pRes = (tValuePair**) pTopBotInfo->res; @@ -2038,9 +1916,9 @@ static void top_function(SqlFunctionCtx *pCtx) { STopBotInfo *pRes = getTopBotOutputInfo(pCtx); assert(pRes->num >= 0); - if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i)) { - buildTopBotStruct(pRes, pCtx); - } +// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param.i)) { +// buildTopBotStruct(pRes, pCtx); +// } for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_DATA(pCtx, i); @@ -2052,7 +1930,7 @@ static void top_function(SqlFunctionCtx *pCtx) { // NOTE: Set the default timestamp if it is missing [todo refactor] TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; -// do_top_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); +// do_top_function_add(pRes, (int32_t)pCtx->param[0].param.i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); } if (!pCtx->hasNull) { @@ -2079,7 +1957,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type; -// do_top_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, +// do_top_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, // type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } @@ -2096,9 +1974,9 @@ static void bottom_function(SqlFunctionCtx *pCtx) { STopBotInfo *pRes = getTopBotOutputInfo(pCtx); - if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i)) { - buildTopBotStruct(pRes, pCtx); - } +// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param.i)) { +// buildTopBotStruct(pRes, pCtx); +// } for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_DATA(pCtx, i); @@ -2109,7 +1987,7 @@ static void bottom_function(SqlFunctionCtx *pCtx) { notNullElems++; // NOTE: Set the default timestamp if it is missing [todo refactor] TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; -// do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); +// do_bottom_function_add(pRes, (int32_t)pCtx->param[0].param.i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); } if (!pCtx->hasNull) { @@ -2136,7 +2014,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type; -// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, +// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, // &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } @@ -2162,11 +2040,11 @@ static void top_bottom_func_finalizer(SqlFunctionCtx *pCtx) { tValuePair **tvp = pRes->res; // user specify the order of output by sort the result according to timestamp - if (pCtx->param[1].i == PRIMARYKEY_TIMESTAMP_COL_ID) { - __compar_fn_t comparator = (pCtx->param[2].i == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; + if (pCtx->param[1].param.i == PRIMARYKEY_TIMESTAMP_COL_ID) { + __compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); - } else /*if (pCtx->param[1].i > PRIMARYKEY_TIMESTAMP_COL_ID)*/ { - __compar_fn_t comparator = (pCtx->param[2].i == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; + } else /*if (pCtx->param[1].param.i > PRIMARYKEY_TIMESTAMP_COL_ID)*/ { + __compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); } @@ -2277,7 +2155,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) { } static void percentile_finalizer(SqlFunctionCtx *pCtx) { - double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i : pCtx->param[0].d; +// double v = pCtx->param[0].param.nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].param.i : pCtx->param[0].param.d; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo); @@ -2287,7 +2165,7 @@ static void percentile_finalizer(SqlFunctionCtx *pCtx) { assert(ppInfo->numOfElems == 0); setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } else { - SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v)); +// SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v)); } tMemBucketDestroy(pMemBucket); @@ -2389,7 +2267,7 @@ static void apercentile_func_merge(SqlFunctionCtx *pCtx) { } static void apercentile_finalizer(SqlFunctionCtx *pCtx) { - double v = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i : pCtx->param[0].d; + double v = (pCtx->param[0].param.nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].param.i : pCtx->param[0].param.d; SResultRowEntryInfo * pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); @@ -2432,7 +2310,7 @@ static bool leastsquares_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInf SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // 2*3 matrix - pInfo->startVal = pCtx->param[0].d; +// pInfo->startVal = pCtx->param[0].param.d; return true; } @@ -2478,54 +2356,54 @@ static void leastsquares_function(SqlFunctionCtx *pCtx) { param[0][2] += x * p[i]; param[1][2] += p[i]; - x += pCtx->param[1].d; + x += pCtx->param[1].param.d; numOfElem++; } break; } case TSDB_DATA_TYPE_BIGINT: { int64_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_DOUBLE: { double *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_FLOAT: { float *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; }; case TSDB_DATA_TYPE_SMALLINT: { int16_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_TINYINT: { int8_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_UTINYINT: { uint8_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_USMALLINT: { uint16_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_UINT: { uint32_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_UBIGINT: { uint64_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } } @@ -2584,16 +2462,16 @@ static void col_project_function(SqlFunctionCtx *pCtx) { } // only one row is required. - if (pCtx->param[0].i == 1) { - SET_VAL(pCtx, pCtx->size, 1); - } else { - INC_INIT_VAL(pCtx, pCtx->size); - } +// if (pCtx->param[0].param.i == 1) { +// SET_VAL(pCtx, pCtx->size, 1); +// } else { +// INC_INIT_VAL(pCtx, pCtx->size); +// } char *pData = GET_INPUT_DATA_LIST(pCtx); if (pCtx->order == TSDB_ORDER_ASC) { - int32_t numOfRows = (pCtx->param[0].i == 1)? 1:pCtx->size; - memcpy(pCtx->pOutput, pData, (size_t) numOfRows * pCtx->inputBytes); +// int32_t numOfRows = (pCtx->param[0].param.i == 1)? 1:pCtx->size; +// memcpy(pCtx->pOutput, pData, (size_t) numOfRows * pCtx->inputBytes); } else { for(int32_t i = 0; i < pCtx->size; ++i) { memcpy(pCtx->pOutput + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes, @@ -2658,7 +2536,7 @@ static bool diff_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResI } // diff function require the value is set to -1 - pCtx->param[1].nType = INITIAL_VALUE_NOT_ASSIGNED; + pCtx->param[1].param.nType = INITIAL_VALUE_NOT_ASSIGNED; return false; } @@ -2670,9 +2548,9 @@ static bool deriv_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRes // diff function require the value is set to -1 SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResultInfo); - pDerivInfo->ignoreNegative = pCtx->param[1].i; +// pDerivInfo->ignoreNegative = pCtx->param[1].param.i; pDerivInfo->prevTs = -1; - pDerivInfo->tsWindow = pCtx->param[0].i; +// pDerivInfo->tsWindow = pCtx->param[0].param.i; pDerivInfo->valueSet = false; return false; } @@ -2861,12 +2739,12 @@ static void deriv_function(SqlFunctionCtx *pCtx) { #define DIFF_IMPL(ctx, d, type) \ do { \ - if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ - (ctx)->param[1].nType = (ctx)->inputType; \ - *(type *)&(ctx)->param[1].i = *(type *)(d); \ + if ((ctx)->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED) { \ + (ctx)->param[1].param.nType = (ctx)->inputType; \ + *(type *)&(ctx)->param[1].param.i = *(type *)(d); \ } else { \ - *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \ - *(type *)(&(ctx)->param[1].i) = *(type *)(d); \ + *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].param.i)); \ + *(type *)(&(ctx)->param[1].param.i) = *(type *)(d); \ *(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \ } \ } while (0); @@ -2874,7 +2752,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) { // TODO difference in date column static void diff_function(SqlFunctionCtx *pCtx) { void *data = GET_INPUT_DATA_LIST(pCtx); - bool isFirstBlock = (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED); + bool isFirstBlock = (pCtx->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED); int32_t notNullElems = 0; @@ -2894,15 +2772,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int32_t)(pData[i] - pCtx->param[1].i); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int32_t)(pData[i] - pCtx->param[1].param.i); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.i = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -2916,15 +2794,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = pData[i] - pCtx->param[1].i; // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = pData[i] - pCtx->param[1].param.i; // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.i = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -2938,15 +2816,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].d); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].param.d); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].d = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.d = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -2960,15 +2838,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (float)(pData[i] - pCtx->param[1].d); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (float)(pData[i] - pCtx->param[1].param.d); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].d = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.d = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -2982,15 +2860,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int16_t)(pData[i] - pCtx->param[1].i); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int16_t)(pData[i] - pCtx->param[1].param.i); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.i = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -3005,15 +2883,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int8_t)(pData[i] - pCtx->param[1].i); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int8_t)(pData[i] - pCtx->param[1].param.i); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.i = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -3024,7 +2902,7 @@ static void diff_function(SqlFunctionCtx *pCtx) { } // initial value is not set yet - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED || notNullElems <= 0) { + if (pCtx->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED || notNullElems <= 0) { /* * 1. current block and blocks before are full of null * 2. current block may be null value @@ -3091,8 +2969,8 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe // this is the server-side setup function in client-side, the secondary merge do not need this procedure if (pCtx->currentStage == MERGE_STAGE) { - pCtx->param[0].d = DBL_MAX; - pCtx->param[3].d = -DBL_MAX; +// pCtx->param[0].param.d = DBL_MAX; +// pCtx->param[3].param.d = -DBL_MAX; } else { pInfo->min = DBL_MAX; pInfo->max = -DBL_MAX; @@ -3192,13 +3070,13 @@ void spread_func_merge(SqlFunctionCtx *pCtx) { return; } - if (pCtx->param[0].d > pData->min) { - pCtx->param[0].d = pData->min; - } +// if (pCtx->param[0].param.d > pData->min) { +// pCtx->param[0].param.d = pData->min; +// } - if (pCtx->param[3].d < pData->max) { - pCtx->param[3].d = pData->max; - } +// if (pCtx->param[3].param.d < pData->max) { +// pCtx->param[3].param.d = pData->max; +// } // GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; } @@ -3218,7 +3096,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) { // return; // } - SET_DOUBLE_VAL((double *)pCtx->pOutput, pCtx->param[3].d - pCtx->param[0].d); +// SET_DOUBLE_VAL((double *)pCtx->pOutput, pCtx->param[3].param.d - pCtx->param[0].param.d); } else { assert(IS_NUMERIC_TYPE(pCtx->inputType) || (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP)); @@ -3571,7 +3449,7 @@ void twa_function_finalizer(SqlFunctionCtx *pCtx) { */ static void interp_function_impl(SqlFunctionCtx *pCtx) { - int32_t type = (int32_t) pCtx->param[2].i; + int32_t type = (int32_t) pCtx->param[2].param.i; if (type == TSDB_FILL_NONE) { return; } @@ -3583,7 +3461,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) { } else if (type == TSDB_FILL_NULL) { setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } else if (type == TSDB_FILL_SET_VALUE) { - taosVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); +// taosVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); } else { if (pCtx->start.key != INT64_MIN && ((ascQuery && pCtx->start.key <= pCtx->startTs && pCtx->end.key >= pCtx->startTs) || ((!ascQuery) && pCtx->start.key >= pCtx->startTs && pCtx->end.key <= pCtx->startTs))) { if (type == TSDB_FILL_PREV) { @@ -3755,11 +3633,11 @@ static void ts_comp_function(SqlFunctionCtx *pCtx) { // primary ts must be existed, so no need to check its existance if (pCtx->order == TSDB_ORDER_ASC) { - tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); +// tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].param.i, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); } else { for (int32_t i = pCtx->size - 1; i >= 0; --i) { char *d = GET_INPUT_DATA(pCtx, i); - tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i, &pCtx->tag, d, (int32_t)TSDB_KEYSIZE); +// tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].param.i, &pCtx->tag, d, (int32_t)TSDB_KEYSIZE); } } @@ -3911,7 +3789,7 @@ static void rate_finalizer(SqlFunctionCtx *pCtx) { return; } - SET_DOUBLE_VAL((double*) pCtx->pOutput, do_calc_rate(pRateInfo, (double) TSDB_TICK_PER_SECOND(pCtx->param[0].i))); +// SET_DOUBLE_VAL((double*) pCtx->pOutput, do_calc_rate(pRateInfo, (double) TSDB_TICK_PER_SECOND(pCtx->param[0].param.i))); // cannot set the numOfIteratedElems again since it is set during previous iteration pResInfo->numOfRes = 1; @@ -4008,7 +3886,7 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) { int32_t len = *(int32_t*) pCtx->pInput; blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist); - pDist->rowSize = (uint16_t)pCtx->param[0].i; +// pDist->rowSize = (uint16_t)pCtx->param[0].param.i; memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len); @@ -4160,7 +4038,7 @@ void blockinfo_func_finalizer(SqlFunctionCtx* pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo); - pDist->rowSize = (uint16_t)pCtx->param[0].i; +// pDist->rowSize = (uint16_t)pCtx->param[0].param.i; generateBlockDistResult(pDist, pCtx->pOutput); if (pDist->dataBlockInfos != NULL) { @@ -4557,9 +4435,9 @@ SAggFunctionInfo aggFunc[35] = {{ FUNCTION_AVG, FUNCSTATE_SO | FUNCSTATE_STABLE, function_setup, - stddev_dst_function, - stddev_dst_finalizer, - stddev_dst_merge, + NULL, + NULL, + NULL, dataBlockRequired, }, { diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index 48a0f327c4111d89dfd99437fc7c763d2d65b101..8675670cfe1e61da83e79e4e3a0ea665cec597ca 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -229,22 +229,21 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size } void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar, - __ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot) { + __ext_compar_fn_t compar, char* buf, bool maxroot) { int32_t parent; int32_t child; - char *buf; + + char* tmp = NULL; + if (buf == NULL) { + tmp = taosMemoryMalloc(size); + } else { + tmp = buf; + } if (base && size > 0 && compar) { parent = start; child = 2 * parent + 1; - if (swap == NULL) { - buf = taosMemoryCalloc(1, size); - if (buf == NULL) { - return; - } - } - if (maxroot) { while (child <= end) { if (child + 1 <= end && @@ -256,11 +255,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const break; } - if (swap == NULL) { - doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf); - } else { - (*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap); - } + doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp); parent = child; child = 2 * parent + 1; @@ -276,33 +271,35 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const break; } - if (swap == NULL) { - doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf); - } else { - (*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap); - } + doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp); parent = child; child = 2 * parent + 1; } } + } - if (swap == NULL) { - taosMemoryFreeClear(buf); - } + if (buf == NULL) { + taosMemoryFree(tmp); } } void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar, - const void *parswap, __ext_swap_fn_t swap, bool maxroot) { + bool maxroot) { int32_t i; + char* buf = taosMemoryCalloc(1, size); + if (buf == NULL) { + return; + } + if (base && size > 0) { for (i = len / 2 - 1; i >= 0; i--) { - taosheapadjust(base, size, i, len - 1, parcompar, compar, parswap, swap, maxroot); + taosheapadjust(base, size, i, len - 1, parcompar, compar, buf, maxroot); } } + taosMemoryFree(buf); /* char *buf = taosMemoryCalloc(1, size); diff --git a/tests/script/tsim/query/session.sim b/tests/script/tsim/query/session.sim index d5e77c7b28471fe84ddf6fbe8310ba038f1ac358..a69b6249fc3f0389ca2982d11c9afc1435f7ced6 100644 --- a/tests/script/tsim/query/session.sim +++ b/tests/script/tsim/query/session.sim @@ -279,7 +279,8 @@ endi print ================> syntax error check not active ================> reactive sql_error select * from dev_001 session(ts,1w) -sql_error select count(*) from st session(ts,1w) +print disable this temporarily, session can not be directly applied to super table. +#sql_error select count(*) from st session(ts,1w) sql_error select count(*) from dev_001 group by tagtype session(ts,1w) sql_error sql select count(*) from dev_001 session(ts,1n) sql_error sql select count(*) from dev_001 session(ts,1y) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index ca5bf69795060d84025fb3d90e2b3f118f183846..36d2866fb56c3146543886b02a0307eb5874b83e 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -224,27 +224,63 @@ int32_t shellRunCommand(TAOS *con, char *command) { } } - char quote = 0, *cmd = command; + bool esc = false; + char quote = 0, *cmd = command, *p = command; for (char c = *command++; c != 0; c = *command++) { - if (c == '\\' && (*command == '\'' || *command == '"' || *command == '`')) { - command ++; + if (esc) { + switch (c) { + case 'n': + c = '\n'; + break; + case 'r': + c = '\r'; + break; + case 't': + c = '\t'; + break; + case 'G': + *p++ = '\\'; + break; + case '\'': + case '"': + if (quote) { + *p++ = '\\'; + } + break; + } + *p++ = c; + esc = false; continue; } + if (c == '\\') { + if (quote != 0 && (*command == '_' || *command == '\\')) { + // DO nothing + } else { + esc = true; + continue; + } + } + if (quote == c) { quote = 0; - } else if (quote == 0 && (c == '\'' || c == '"' || c == '`')) { + } else if (quote == 0 && (c == '\'' || c == '"')) { quote = c; - } else if (c == ';' && quote == 0) { - c = *command; - *command = 0; + } + + *p++ = c; + if (c == ';' && quote == 0) { + c = *p; + *p = 0; if (shellRunSingleCommand(con, cmd) < 0) { return -1; } - *command = c; - cmd = command; + *p = c; + p = cmd; } } + + *p = 0; return shellRunSingleCommand(con, cmd); } @@ -538,23 +574,19 @@ static void shellPrintNChar(const char *str, int length, int width) { while (pos < length) { TdWchar wc; int bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); - if (bytes <= 0) { + if (bytes == 0) { break; } - if (pos + bytes > length) { + pos += bytes; + if (pos > length) { break; } - int w = 0; + #ifdef WINDOWS - w = bytes; + int w = bytes; #else - if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){ - w = bytes; - }else{ - w = taosWcharWidth(wc); - } + int w = taosWcharWidth(wc); #endif - pos += bytes; if (w <= 0) { continue; } @@ -648,7 +680,6 @@ static void printField(const char *val, TAOS_FIELD *field, int width, int32_t le break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_JSON: shellPrintNChar(val, length, width); break; case TSDB_DATA_TYPE_TIMESTAMP: @@ -761,8 +792,7 @@ static int calcColWidth(TAOS_FIELD *field, int precision) { return TMAX(field->bytes, width); } - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_JSON:{ + case TSDB_DATA_TYPE_NCHAR: { int16_t bytes = field->bytes * TSDB_NCHAR_SIZE; if (bytes > tsMaxBinaryDisplayWidth) { return TMAX(tsMaxBinaryDisplayWidth, width);