diff --git a/Jenkinsfile2 b/Jenkinsfile2 index a2043797c09b871e174c3c05d80a689238b85919..441cf896267bed8cd67637c1ea328e884bf0012f 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -88,12 +88,6 @@ def pre_test(){ cmake .. > /dev/null make -j4> /dev/null ''' - sh''' - cd ${WKPY} - git reset --hard - git pull - pip3 install . - ''' return 1 } @@ -103,7 +97,6 @@ pipeline { environment{ WK = '/var/lib/jenkins/workspace/TDinternal' WKC= '/var/lib/jenkins/workspace/TDengine' - WKPY= '/var/lib/jenkins/workspace/taos-connector-python' } stages { stage('pre_build'){ @@ -124,11 +117,6 @@ pipeline { ./test-all.sh b1fq ''' sh''' - export LD_LIBRARY_PATH=${WKC}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' - sh''' cd ${WKC}/debug ctest ''' diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 71dfcf0987a7906719b5f73f4b39403fa9f87e0b..36198837036b4a01b3abdec17d38d1ce08c57530 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -326,6 +326,13 @@ int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); +typedef struct { + SEpSet epSet; +} SMEpSet; + +int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq); +int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq); + typedef struct { int8_t connType; int32_t pid; @@ -984,7 +991,6 @@ int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); void tFreeSShowRsp(SShowRsp* pRsp); typedef struct { - int32_t type; char db[TSDB_DB_FNAME_LEN]; char tb[TSDB_TABLE_NAME_LEN]; int64_t showId; @@ -2753,6 +2759,7 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p } return buf; } + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/libs/function/function.h b/include/libs/function/function.h index eafe64c2949979a419a491e5baeebbf3e00ea834..68fc3be617d9292f5a2f618af6eb4e52b2ec7aba 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -166,6 +166,7 @@ typedef struct SInputColumnInfoData { SColumnInfoData *pPTS; // primary timestamp column SColumnInfoData **pData; SColumnDataAgg **pColumnDataAgg; + uint64_t uid; // table uid } SInputColumnInfoData; // sql function runtime context @@ -191,7 +192,7 @@ typedef struct SqlFunctionCtx { int16_t functionId; // function id char * pOutput; // final result output buffer, point to sdata->data int32_t numOfParams; - SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param + SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param int64_t *ptsList; // corresponding timestamp array list SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ int32_t offset; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 3e2f5967840805cde8b9a37d4e437900965f42e3..ab26cfc1555b6f50a064fb8ab41a91bad62d7b88 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -54,12 +54,13 @@ typedef struct { uint16_t clientPort; SRpcMsg rpcMsg; int32_t rspLen; - void *pRsp; - void *pNode; + void * pRsp; + void * pNode; } SNodeMsg; typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); +typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *); typedef struct SRpcInit { uint16_t localPort; // local port @@ -80,22 +81,25 @@ typedef struct SRpcInit { RpcCfp cfp; // call back to retrieve the client auth info, for server app only - RpcAfp afp;; + RpcAfp afp; + + // user defined retry func + RpcRfp rfp; void *parent; } SRpcInit; typedef struct { - void *val; + void *val; int32_t (*clone)(void *src, void **dst); - void (*freeFunc)(const void *arg); + void (*freeFunc)(const void *arg); } SRpcCtxVal; typedef struct { - int32_t msgType; - void *val; + int32_t msgType; + void * val; int32_t (*clone)(void *src, void **dst); - void (*freeFunc)(const void *arg); + void (*freeFunc)(const void *arg); } SRpcBrokenlinkVal; typedef struct { diff --git a/include/os/osEnv.h b/include/os/osEnv.h index 14d50858b77d23836091eed1508e755a12cd74fe..a3f92a0b2914dca8afe5e63e9c121694039d52ac 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -34,6 +34,7 @@ extern int64_t tsOpenMax; extern int64_t tsStreamMax; extern float tsNumOfCores; extern int64_t tsTotalMemoryKB; +extern char* tsProcPath; extern char configDir[]; extern char tsDataDir[]; diff --git a/include/util/talgo.h b/include/util/talgo.h index ebddce62a84fce6d936e62fa8b925a1216154a66..3ce26526087bfe393234aad696601839280fc479 100644 --- a/include/util/talgo.h +++ b/include/util/talgo.h @@ -82,7 +82,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size * @return */ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar, - __ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot); + __ext_compar_fn_t compar, char* buf, bool maxroot); /** * sort heap to make sure it is a max/min root heap @@ -98,7 +98,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const * @return */ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar, - const void *parswap, __ext_swap_fn_t swap, bool maxroot); + bool maxroot); #ifdef __cplusplus } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 53b0fec8ff7fdb7f8288ecdd47e0751c3d7cde70..8e8c62cd946562d26ef5d3062683fc2f9c5bdc10 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "select count(*) from tu"); + pRes = taos_query(pConn, "select now() from m1"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a7c1620a879da9213185a563c9b27a1ff72e5194..28ac3c224bce5d8f1aa1e91ef0b632ebb436df15 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -892,6 +892,27 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { taosArrayDestroy(pReq->pFields); pReq->pFields = NULL; } +int32_t tSerializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeSEpSet(&encoder, &pReq->epSet) < 0) return -1; + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} +int32_t tDeserializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeSEpSet(&decoder, &pReq->epSet) < 0) return -1; + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) { SCoder encoder = {0}; @@ -2489,7 +2510,6 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI64(&encoder, pReq->showId) < 0) return -1; - if (tEncodeI32(&encoder, pReq->type) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1; tEndEncode(&encoder); @@ -2505,7 +2525,6 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->type) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index dd221f84041c8b85eb55661a755333ca4b19183a..376f589acdb64e3abd716aa21dae843b28eedeb8 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -216,6 +216,126 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) { dmStopStatusThread(pWrapper->pDnode); } +static int32_t dmSpawnUdfd(SDnode *pDnode); + +void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { + dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); + uv_close((uv_handle_t*)process, NULL); + SDnode *pDnode = process->data; + SUdfdData *pData = &pDnode->udfdData; + if (atomic_load_8(&pData->stopping) != 0) { + dDebug("udfd process exit due to stopping"); + } else { + uv_close((uv_handle_t*)&pData->ctrlPipe, NULL); + dmSpawnUdfd(pDnode); + } +} + +static int32_t dmSpawnUdfd(SDnode *pDnode) { + dInfo("dnode start spawning udfd"); + uv_process_options_t options = {0}; + + char path[PATH_MAX] = {0}; + if (tsProcPath == NULL) { + path[0] = '.'; + } else { + strncpy(path, tsProcPath, strlen(tsProcPath)); + taosDirName(path); + } + strcat(path, "/udfd"); + char* argsUdfd[] = {path, "-c", configDir, NULL}; + options.args = argsUdfd; + options.file = path; + + options.exit_cb = dmUdfdExit; + SUdfdData *pData = &pDnode->udfdData; + uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); + + uv_stdio_container_t child_stdio[3]; + child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe; + child_stdio[1].flags = UV_IGNORE; + child_stdio[2].flags = UV_INHERIT_FD; + child_stdio[2].data.fd = 2; + options.stdio_count = 3; + options.stdio = child_stdio; + + char dnodeIdEnvItem[32] = {0}; + char thrdPoolSizeEnvItem[32] = {0}; + snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId); + float numCpuCores = 4; + taosGetCpuCores(&numCpuCores); + snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); + char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; + options.env = envUdfd; + + int err = uv_spawn(&pData->loop, &pData->process, &options); + pData->process.data = (void*)pDnode; + + if (err != 0) { + dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); + } + return err; +} + +static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + } +} + +void dmWatchUdfd(void *args) { + SDnode *pDnode = args; + SUdfdData *pData = &pDnode->udfdData; + uv_loop_init(&pData->loop); + int32_t err = dmSpawnUdfd(pDnode); + atomic_store_32(&pData->spawnErr, err); + uv_barrier_wait(&pData->barrier); + uv_run(&pData->loop, UV_RUN_DEFAULT); + err = uv_loop_close(&pData->loop); + while (err == UV_EBUSY) { + uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL); + uv_run(&pData->loop, UV_RUN_DEFAULT); + err = uv_loop_close(&pData->loop); + } + return; +} + +int32_t dmStartUdfd(SDnode *pDnode) { + SUdfdData *pData = &pDnode->udfdData; + if (pData->startCalled) { + dInfo("dnode-mgmt start udfd already called"); + return 0; + } + uv_barrier_init(&pData->barrier, 2); + pData->stopping = 0; + uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); + uv_barrier_wait(&pData->barrier); + pData->startCalled = true; + pData->needCleanUp = true; + return pData->spawnErr; +} + +int32_t dmStopUdfd(SDnode *pDnode) { + dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", + pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr); + SUdfdData *pData = &pDnode->udfdData; + if (!pData->needCleanUp) { + return 0; + } + atomic_store_8(&pData->stopping, 1); + + uv_barrier_destroy(&pData->barrier); + if (pData->spawnErr == 0) { + uv_process_kill(&pData->process, SIGINT); + } + uv_stop(&pData->loop); + uv_thread_join(&pData->thread); + + atomic_store_8(&pData->stopping, 0); + return 0; +} + static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt start to init"); SDnode *pDnode = pWrapper->pDnode; @@ -247,6 +367,10 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { } dmReportStartup(pDnode, "dnode-transport", "initialized"); + if (dmStartUdfd(pDnode) != 0) { + dError("failed to start udfd"); + } + dInfo("dnode-mgmt is initialized"); return 0; } @@ -254,6 +378,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { static void dmCleanupMgmt(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt start to clean up"); SDnode *pDnode = pWrapper->pDnode; + dmStopUdfd(pDnode); dmStopWorker(pDnode); taosWLockLatch(&pDnode->data.latch); diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index e6537dcf73a2dbccd8f554c543aed201e935ea02..4f4a2ed3499f1c135626a702370bdb9536a2f9d8 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -16,6 +16,7 @@ #ifndef _TD_DM_DEF_H_ #define _TD_DM_DEF_H_ +#include "uv.h" #include "dmLog.h" #include "cJSON.h" @@ -142,6 +143,18 @@ typedef struct { char desc[TSDB_STEP_DESC_LEN]; } SStartupInfo; +typedef struct SUdfdData { + bool startCalled; + bool needCleanUp; + uv_loop_t loop; + uv_thread_t thread; + uv_barrier_t barrier; + uv_process_t process; + int spawnErr; + int8_t stopping; + uv_pipe_t ctrlPipe; +} SUdfdData; + typedef struct SDnode { EDndProcType ptype; EDndNodeType ntype; @@ -150,6 +163,7 @@ typedef struct SDnode { SStartupInfo startup; SDnodeTrans trans; SDnodeData data; + SUdfdData udfdData; TdThreadMutex mutex; SMgmtWrapper wrappers[NODE_END]; } SDnode; diff --git a/source/dnode/mgmt/test/sut/src/sut.cpp b/source/dnode/mgmt/test/sut/src/sut.cpp index 9b4c4798653858ad22beb7ed637da3b6352674f4..cc32f047afcd1baf9e3d0857fe68248187a29e0e 100644 --- a/source/dnode/mgmt/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/test/sut/src/sut.cpp @@ -89,7 +89,6 @@ int32_t Testbase::SendShowReq(int8_t showType, const char *tb, const char* db) { } SRetrieveTableReq retrieveReq = {0}; - retrieveReq.type = showType; strcpy(retrieveReq.db, db); strcpy(retrieveReq.tb, tb); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 984e9f917fafcad8c802d8f0211fe243f99d0f69..59a00f0eaf3c012ea62ce1259ab916ddd5929351 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -391,7 +391,6 @@ typedef struct { int16_t numOfColumns; int32_t rowSize; int32_t numOfRows; - int32_t payloadLen; void* pIter; SMnode* pMnode; STableMetaRsp* pMeta; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 1a14c94640f5dd8b690f4d4d3506f6baaee16f49..f81179201c405c122f41715dfab3e690670c5afc 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -18,7 +18,7 @@ #define SHOW_STEP_SIZE 100 -static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq); +static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq); static void mndFreeShowObj(SShowObj *pShow); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); @@ -47,18 +47,80 @@ void mndCleanupShow(SMnode *pMnode) { } } -static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq) { +static int32_t convertToRetrieveType(char* name, int32_t len) { + int32_t type = -1; + + if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) { + type = TSDB_MGMT_TABLE_DNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, len) == 0) { + type = TSDB_MGMT_TABLE_MNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, len) == 0) { + type = TSDB_MGMT_TABLE_MODULE; + } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, len) == 0) { + type = TSDB_MGMT_TABLE_QNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, len) == 0) { + type = TSDB_MGMT_TABLE_BNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) { + type = TSDB_MGMT_TABLE_SNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) { + type = TSDB_MGMT_TABLE_CLUSTER; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, len) == 0) { + type = TSDB_MGMT_TABLE_DB; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) { + type = TSDB_MGMT_TABLE_FUNC; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) { + // type = TSDB_MGMT_TABLE_INDEX; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) { + type = TSDB_MGMT_TABLE_STB; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, len) == 0) { + type = TSDB_MGMT_TABLE_STREAMS; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) { + type = TSDB_MGMT_TABLE_TABLE; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) { + // type = TSDB_MGMT_TABLE_DIST; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, len) == 0) { + type = TSDB_MGMT_TABLE_USER; + } else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, len) == 0) { + type = TSDB_MGMT_TABLE_GRANTS; + } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, len) == 0) { + type = TSDB_MGMT_TABLE_VGROUP; + } else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, len) == 0) { + type = TSDB_MGMT_TABLE_TOPICS; + } else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, len) == 0) { + type = TSDB_MGMT_TABLE_CONSUMERS; + } else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, len) == 0) { + type = TSDB_MGMT_TABLE_SUBSCRIBES; + } else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, len) == 0) { + type = TSDB_MGMT_TABLE_TRANS; + } else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, len) == 0) { + type = TSDB_MGMT_TABLE_SMAS; + } else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, len) == 0) { + type = TSDB_MGMT_TABLE_CONFIGS; + } else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, len) == 0) { + type = TSDB_MGMT_TABLE_CONNS; + } else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, len) == 0) { + type = TSDB_MGMT_TABLE_QUERIES; + } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) { + type = TSDB_MGMT_TABLE_VNODES; + } else { +// ASSERT(0); + } + + return type; +} + +static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) { SShowMgmt *pMgmt = &pMnode->showMgmt; int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1); if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1); - int32_t size = sizeof(SShowObj) + pReq->payloadLen; + int32_t size = sizeof(SShowObj); + SShowObj showObj = {0}; - showObj.id = showId; + showObj.id = showId; showObj.pMnode = pMnode; - showObj.type = pReq->type; - showObj.payloadLen = pReq->payloadLen; + showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); int32_t keepTime = tsShellActivityTimer * 6 * 1000; @@ -127,10 +189,6 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { } if (retrieveReq.showId == 0) { - SShowReq req = {0}; - req.type = retrieveReq.type; - strncpy(req.db, retrieveReq.db, tListLen(req.db)); - STableMetaRsp *pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb) + 1); if (pMeta == NULL) { terrno = TSDB_CODE_MND_INVALID_INFOS_TBL; @@ -138,7 +196,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { return -1; } - pShow = mndCreateShowObj(pMnode, &req); + pShow = mndCreateShowObj(pMnode, &retrieveReq); if (pShow == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to process show-meta req since %s", terrstr()); diff --git a/source/dnode/mnode/impl/test/show/show.cpp b/source/dnode/mnode/impl/test/show/show.cpp index 2b0eaa0becba0696acd0e516b8a904be4abe73a0..5c431f65d3a5eb5663eb3a92005a4635e6f7f384 100644 --- a/source/dnode/mnode/impl/test/show/show.cpp +++ b/source/dnode/mnode/impl/test/show/show.cpp @@ -36,7 +36,7 @@ TEST_F(MndTestShow, 01_ShowMsg_InvalidMsgMax) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_INVALID_MSG); + ASSERT_NE(pRsp->code, 0); } TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { @@ -50,7 +50,7 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_INVALID_MSG); + ASSERT_NE(pRsp->code, 0); } TEST_F(MndTestShow, 03_ShowMsg_Conn) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9e48e12cb5187072a99cfe20f1b7f17c7cbc02d9..5adfb7caca4a3f633ac8b3858581a4e9760fe1bf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -382,7 +382,7 @@ typedef struct SSysTableScanInfo { void* pCur; // cursor for iterate the local table meta store. SArray* scanCols; // SArray scan column id list - int32_t type; // show type, TODO remove it +// int32_t type; // show type, TODO remove it SName name; SSDataBlock* pRes; int32_t capacity; @@ -479,7 +479,7 @@ typedef struct { typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; - SArray* pGroupCols; + SArray* pGroupCols; // group by columns, SArray SArray* pGroupColVals; // current group column values, SArray SNode* pCondition; bool isInit; // denote if current val is initialized or not @@ -600,7 +600,6 @@ typedef struct SJoinOperatorInfo { int32_t rightPos; SColumnInfo rightCol; SNode *pOnCondition; -// SRspResultInfo resultInfo; } SJoinOperatorInfo; int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7791a345ed7f34626c6313d56cfc7a160dcba28c..7a7cb68424ff4b74a5596b2dfbdc83cd00bf4493 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1087,6 +1087,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pCtx[i].currentStage = MAIN_SCAN; SInputColumnInfoData* pInput = &pCtx[i].input; + pInput->uid = pBlock->info.uid; SExprInfo* pOneExpr = &pOperator->pExpr[i]; for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { @@ -1101,7 +1102,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { - if (createDummyCol) { + // todo avoid case: top(k, 12), 12 is the value parameter. + // sum(11), 11 is also the value parameter. + if (createDummyCol && pOneExpr->base.numOfParams == 1) { code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1876,67 +1879,58 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, } } pCtx->resDataInfo.interBufSize = env.calcMemSize; - } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { - pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; // for simple column, the intermediate buffer needs to hold one element. + } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || + pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { + // for simple column, the intermediate buffer needs to hold one element. + pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; } pCtx->input.numOfInputCols = pFunct->numOfParams; pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); - pCtx->pTsOutput = NULL; + pCtx->pTsOutput = NULL; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; - pCtx->resDataInfo.type = pFunct->resSchema.type; - pCtx->order = TSDB_ORDER_ASC; - pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; -#if 0 - for (int32_t j = 0; j < pCtx->numOfParams; ++j) { -// int16_t type = pFunct->param[j].nType; -// int16_t bytes = pFunct->param[j].nLen; - -// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { -// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type); -// } else { -// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type); -// } - } - - // set the order information for top/bottom query - int32_t functionId = pCtx->functionId; - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { - int32_t f = getExprFunctionId(&pExpr[0]); - assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); - -// pCtx->param[2].i = pQueryAttr->order.order; - pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[3].i = functionId; - pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; - -// pCtx->param[1].i = pQueryAttr->order.col.info.colId; - } else if (functionId == FUNCTION_INTERP) { -// pCtx->param[2].i = (int8_t)pQueryAttr->fillType; -// if (pQueryAttr->fillVal != NULL) { -// if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { -// pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; -// } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value -// if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { -// taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType); -// } -// } + pCtx->resDataInfo.type = pFunct->resSchema.type; + pCtx->order = TSDB_ORDER_ASC; + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; + pCtx->numOfParams = pExpr->base.numOfParams; + + pCtx->param = pFunct->pParam; +// for (int32_t j = 0; j < pCtx->numOfParams; ++j) { +// // set the order information for top/bottom query +// int32_t functionId = pCtx->functionId; +// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { +// int32_t f = getExprFunctionId(&pExpr[0]); +// assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); +// +// // pCtx->param[2].i = pQueryAttr->order.order; +// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; +// // pCtx->param[3].i = functionId; +// // pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; +// +// // pCtx->param[1].i = pQueryAttr->order.col.info.colId; +// } else if (functionId == FUNCTION_INTERP) { +// // pCtx->param[2].i = (int8_t)pQueryAttr->fillType; +// // if (pQueryAttr->fillVal != NULL) { +// // if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { +// // pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; +// // } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value +// // if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { +// // taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType); +// // } +// // } +// // } +// } else if (functionId == FUNCTION_TWA) { +// // pCtx->param[1].i = pQueryAttr->window.skey; +// // pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; +// // pCtx->param[2].i = pQueryAttr->window.ekey; +// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; +// } else if (functionId == FUNCTION_ARITHM) { +// // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); // } - } else if (functionId == FUNCTION_TS_COMP) { -// pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client - pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT; - } else if (functionId == FUNCTION_TWA) { -// pCtx->param[1].i = pQueryAttr->window.skey; - pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; -// pCtx->param[2].i = pQueryAttr->window.ekey; - pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - } else if (functionId == FUNCTION_ARITHM) { -// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); - } -#endif +// } } for (int32_t i = 1; i < numOfOutput; ++i) { @@ -1955,7 +1949,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) { - taosVariantDestroy(&pCtx[i].param[j]); + taosVariantDestroy(&pCtx[i].param[j].param); } taosVariantDestroy(&pCtx[i].tag); @@ -6487,9 +6481,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); - if (pDataReader == NULL) { - return NULL; - } SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b054bbfcb6eb77dd250510b82843e998c8bdb765..ce3de02c9fb4685f628374ee53c5f0baf096b9b5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -319,7 +319,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // The read handle is not initialized yet, since no qualified tables exists - if (pTableScanInfo->dataReader == NULL) { + if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -375,10 +375,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { return p; } -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, - int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, - SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, - SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag, + int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, + SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); @@ -391,19 +390,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, return NULL; } - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; - pInfo->dataBlockLoadFlag = dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pTsdbReadHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->order = order; - pInfo->current = 0; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; + pInfo->interval = *pInterval; + pInfo->sampleRatio = sampleRatio; + pInfo->dataBlockLoadFlag= dataLoadFlag; + pInfo->pResBlock = pResBlock; + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pDataReader; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; + pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; @@ -677,7 +676,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { tsem_destroy(&pInfo->ready); blockDataDestroy(pInfo->pRes); - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { metaCloseTbCursor(pInfo->pCur); } } @@ -812,7 +812,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { SSysTableScanInfo* pInfo = pOperator->info; // retrieve local table list info from vnode - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (pInfo->pCur == NULL) { pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); } @@ -864,8 +865,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { while (1) { int64_t startTs = taosGetTimestampUs(); - - pInfo->req.type = pInfo->type; strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); if (pInfo->showRewrite) { @@ -947,68 +946,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pInfo->pCondition = pCondition; pInfo->scanCols = colList; - // TODO remove it - int32_t tableType = 0; - const char* name = tNameGetTableName(pName); - if (strncasecmp(name, TSDB_INS_TABLE_DNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_DNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_MNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_MODULE; - } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_QNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_BNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_SNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_CLUSTER; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_DB; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_FUNC; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, tListLen(pName->tname)) == 0) { - // tableType = TSDB_MGMT_TABLE_INDEX; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_STB; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_STREAMS; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_TABLE; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, tListLen(pName->tname)) == 0) { - // tableType = TSDB_MGMT_TABLE_DIST; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_USER; - } else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_GRANTS; - } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_VGROUP; - } else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_TOPICS; - } else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_CONSUMERS; - } else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_SUBSCRIBES; - } else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_TRANS; - } else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_SMAS; - } else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_CONFIGS; - } else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_CONNS; - } else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_QUERIES; - } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_VNODES; - } else { - ASSERT(0); - } - tNameAssign(&pInfo->name, pName); - pInfo->type = tableType; - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { pInfo->readHandle = pSysTableReadHandle; blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity); } else { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 11c89f15685b2e361b35171e2881dbe292357534..8473138e4af4fd5f075a2bb7a7ff75087b75d90b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -58,6 +58,9 @@ bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx); +bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); +int32_t topFunction(SqlFunctionCtx *pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index c51b6e12642e74a8c31f809c9f85c9403576643d..b5c839c811314333f6de390124be05ad02637ae9 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -27,41 +27,31 @@ extern "C" { #endif #define UDF_LISTEN_PIPE_NAME_LEN 32 -#define UDF_LISTEN_PIPE_NAME_PREFIX "udf.sock." +#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock." //====================================================================================== //begin API to taosd and qworker enum { UDFC_CODE_STOPPING = -1, - UDFC_CODE_RESTARTING = -2, UDFC_CODE_PIPE_READ_ERR = -3, }; -/*TODO: no api for dnode startudfd/stopudfd*/ -/** - * start udfd dameon service - */ -int32_t startUdfd(int32_t dnodeId); - -/** - * stop udfd dameon service - */ -int32_t stopUdfd(int32_t dnodeId); +typedef void *UdfcHandle; +typedef void *UdfcFuncHandle; /** * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * @return error code */ -int32_t createUdfdProxy(int32_t dnodeId); +int32_t udfcOpen(int32_t dnodeId, UdfcHandle* proxyHandle); /** * destroy udfd proxy * @return error code */ -int32_t destroyUdfdProxy(int32_t dnodeId); +int32_t udfcClose(UdfcHandle proxyhandle); -typedef void *UdfHandle; /** * setup udf @@ -69,7 +59,7 @@ typedef void *UdfHandle; * @param handle, out * @return error code */ -int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle); +int32_t setupUdf(UdfcHandle proxyHandle, char udfName[], SEpSet *epSet, UdfcFuncHandle *handle); typedef struct SUdfColumnMeta { int16_t type; @@ -116,26 +106,26 @@ typedef struct SUdfInterBuf { } SUdfInterBuf; // output: interBuf -int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf); +int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); // input: block, state // output: newState -int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); +int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); // input: interBuf // output: resultData -int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); +int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); +int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData -int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData); +int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData); /** * tearn down udf * @param handle * @return */ -int32_t teardownUdf(UdfHandle handle); +int32_t teardownUdf(UdfcFuncHandle handle); // end API to taosd and qworker //============================================================================================================================= diff --git a/source/libs/function/inc/udfc.h b/source/libs/function/inc/udfc.h index fed2818cedd353e7241fe8a9a6842ff0d50f37b3..1cf8c6768633a514b21c1dad01690c2abedf5b05 100644 --- a/source/libs/function/inc/udfc.h +++ b/source/libs/function/inc/udfc.h @@ -30,20 +30,20 @@ typedef struct SUdfInfo { char *path; } SUdfInfo; -typedef void *UdfHandle; +typedef void *UdfcFuncHandle; int32_t createUdfdProxy(); int32_t destroyUdfdProxy(); -//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles); +//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfcFuncHandle *handles); -int32_t setupUdf(SUdfInfo* udf, UdfHandle* handle); +int32_t setupUdf(SUdfInfo* udf, UdfcFuncHandle* handle); -int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, +int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, int32_t *newStateSize, SSDataBlock *output); -int32_t teardownUdf(UdfHandle handle); +int32_t teardownUdf(UdfcFuncHandle handle); typedef struct SUdfSetupRequest { char udfName[16]; // diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index ce7384cbbe34e9c93b8fc35689c04915cc0bbf68..efd8de84c40b233f8a3a18f144b5112ec5993a2b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -193,8 +193,15 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return TSDB_CODE_SUCCESS; } +static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + // pseudo column do not need to check parameters + pFunc->node.resType = (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - // todo + SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; + pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; return TSDB_CODE_SUCCESS; } @@ -497,9 +504,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_TOP, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateTop, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, - .processFunc = maxFunction, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = functionSetup, + .processFunc = topFunction, .finalizeFunc = functionFinalize }, { @@ -876,7 +883,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "tbname", .type = FUNCTION_TYPE_TBNAME, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, - .translateFunc = NULL, + .translateFunc = translateTbnameColumn, .getEnvFunc = NULL, .initFunc = NULL, .sprocessFunc = NULL, @@ -914,7 +921,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { }, { .name = "_wendts", - .type = FUNCTION_TYPE_QENDTS, + .type = FUNCTION_TYPE_WENDTS, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .translateFunc = translateTimePseudoColumn, .getEnvFunc = getTimePseudoFuncEnv, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3d796515a0ec937b65885d211619130604adf7b1..09427265889f921b7c023738dde788b9669495a4 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -14,10 +14,11 @@ */ #include "builtinsimpl.h" -#include "tpercentile.h" +#include #include "querynodes.h" #include "taggfunction.h" #include "tdatablock.h" +#include "tpercentile.h" #define SET_VAL(_info, numOfElem, res) \ do { \ @@ -472,17 +473,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -typedef struct STopBotRes { - int32_t num; -} STopBotRes; - -bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0); - int32_t bytes = pColNode->node.resType.bytes; - SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); - return true; -} - typedef struct SStddevRes { double result; int64_t count; @@ -523,7 +513,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { switch (type) { case TSDB_DATA_TYPE_TINYINT: { int8_t* plist = (int8_t*)pCol->pData; - for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + for (int32_t i = start; i < numOfRows + start; ++i) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { continue; } @@ -749,9 +739,9 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -// TODO set the correct parameter. void percentileFinalize(SqlFunctionCtx* pCtx) { - double v = 50;//pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey; + SVariant* pVal = &pCtx->param[1].param; + double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo); @@ -1173,3 +1163,130 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } } +typedef struct STopBotResItem { + SVariant v; + uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data + struct { + int32_t pageId; + int32_t offset; + } tuplePos; // tuple data of this chosen row +} STopBotResItem; + +typedef struct STopBotRes { + int32_t num; + STopBotResItem *pItems; +} STopBotRes; + +bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0); + int32_t bytes = pColNode->node.resType.bytes; + SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); + + pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * bytes; + return true; +} + +static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); + pRes->pItems = (STopBotResItem*)((char*) pRes + sizeof(STopBotRes)); + + return pRes; +} + +static void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid); + +int32_t topFunction(SqlFunctionCtx *pCtx) { + int32_t numOfElems = 0; + + STopBotRes *pRes = getTopBotOutputInfo(pCtx); + assert(pRes->num >= 0); + +// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) { +// buildTopBotStruct(pRes, pCtx); +// } + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t type = pInput->pData[0]->info.type; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + for (int32_t i = start; i < numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + numOfElems++; + + char* data = colDataGetData(pCol, i); + doAddIntoResult(pRes, pCtx->param[1].param.i, data, type, pInput->uid); + } + + // treat the result as only one result + SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); + return TSDB_CODE_SUCCESS; +} + +static int32_t topBotResComparFn(const void *p1, const void *p2, const void *param) { + uint16_t type = *(uint16_t *) param; + + STopBotResItem *val1 = (STopBotResItem *) p1; + STopBotResItem *val2 = (STopBotResItem *) p2; + + if (IS_SIGNED_NUMERIC_TYPE(type)) { + if (val1->v.i == val2->v.i) { + return 0; + } + + return (val1->v.i > val2->v.i) ? 1 : -1; + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + if (val1->v.u == val2->v.u) { + return 0; + } + + return (val1->v.u > val2->v.u) ? 1 : -1; + } + + if (val1->v.d == val2->v.d) { + return 0; + } + + return (val1->v.d > val2->v.d) ? 1 : -1; +} + +void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid) { + SVariant val = {0}; + taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); + + STopBotResItem *pItems = pRes->pItems; + assert(pItems != NULL); + + // not full yet + if (pRes->num < maxSize) { + STopBotResItem* pItem = &pItems[pRes->num]; + pItem->v = val; + pItem->uid = uid; + pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer + + pRes->num++; + taosheapsort((void *) pItem, sizeof(STopBotResItem), pRes->num, (const void *) &type, topBotResComparFn, false); + } else { // replace the minimum value in the result + if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || + (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || + (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) { + STopBotResItem* pItem = &pItems[pRes->num]; + pItem->v = val; + pItem->uid = uid; + pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer + + taosheapadjust((void *) pItem, sizeof(STopBotResItem), 0, pRes->num - 1, (const void *) &type, topBotResComparFn, NULL, false); + } + } +} + +void topBotFinalize(SqlFunctionCtx* pCtx) { + functionFinalize(pCtx); + +} \ No newline at end of file diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 56ee9bc9ae4177584de37cf86abb9240c66ffa56..c26342bfa878fb7fbb092c5e913964a4c96b8615 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -765,9 +765,9 @@ static int32_t firstFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t c } static int32_t lastFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - if (pCtx->order != pCtx->param[0].i) { - return BLK_DATA_NOT_LOAD; - } +// if (pCtx->order != pCtx->param[0].param.i) { +// return BLK_DATA_NOT_LOAD; +// } if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { return BLK_DATA_DATA_LOAD; @@ -797,9 +797,9 @@ static int32_t firstDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32 } static int32_t lastDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - if (pCtx->order != pCtx->param[0].i) { - return BLK_DATA_NOT_LOAD; - } +// if (pCtx->order != pCtx->param[0].param.i) { +// return BLK_DATA_NOT_LOAD; +// } // not initialized yet, it is the first block, load it. if (pCtx->pOutput == NULL) { @@ -1261,128 +1261,6 @@ int32_t tsCompare(const void* p1, const void* p2) { } } -static void stddev_dst_function(SqlFunctionCtx *pCtx) { - SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - // the second stage to calculate standard deviation - double *retVal = &pStd->res; - - // all data are null, no need to proceed - SArray* resList = (SArray*) pCtx->param[0].pz; - if (resList == NULL) { - return; - } - - // find the correct group average results according to the tag value - int32_t len = (int32_t) taosArrayGetSize(resList); - assert(len > 0); - - double avg = 0; - if (len == 1) { - SResPair* p = taosArrayGet(resList, 0); - avg = p->avg; - } else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result - SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare); - if (p == NULL) { - return; - } - - avg = p->avg; - } - - void *pData = GET_INPUT_DATA_LIST(pCtx); - int32_t num = 0; - - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: { - for (int32_t i = 0; i < pCtx->size; ++i) { - if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) { - continue; - } - num += 1; - *retVal += TPOW2(((int32_t *)pData)[i] - avg); - } - break; - } - case TSDB_DATA_TYPE_FLOAT: { - LOOP_STDDEV_IMPL(float, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - LOOP_STDDEV_IMPL(double, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_TINYINT: { - LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UTINYINT: { - LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_SMALLINT: { - LOOP_STDDEV_IMPL(int16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_USMALLINT: { - LOOP_STDDEV_IMPL(uint16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UINT: { - LOOP_STDDEV_IMPL(uint32_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - LOOP_STDDEV_IMPL(int64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UBIGINT: { - LOOP_STDDEV_IMPL(uint64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - default: - assert(0); -// qError("stddev function not support data type:%d", pCtx->inputType); - } - - pStd->num += num; - SET_VAL(pCtx, num, 1); - - // copy to the final output buffer for super table - memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo)); -} - -static void stddev_dst_merge(SqlFunctionCtx *pCtx) { - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo); - - char *input = GET_INPUT_DATA_LIST(pCtx); - - for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { - SStddevdstInfo *pInput = (SStddevdstInfo *)input; - if (pInput->num == 0) { // current input is null - continue; - } - - pRes->num += pInput->num; - pRes->res += pInput->res; - } -} - -static void stddev_dst_finalizer(SqlFunctionCtx *pCtx) { - SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - if (pStd->num <= 0) { - setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); - } else { - double *retValue = (double *)pCtx->pOutput; - SET_DOUBLE_VAL(retValue, sqrt(pStd->res / pStd->num)); - SET_VAL(pCtx, 1, 1); - } - - doFinalizer(pCtx); -} - ////////////////////////////////////////////////////////////////////////////////////// static bool first_last_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { if (!function_setup(pCtx, pResInfo)) { @@ -1390,8 +1268,8 @@ static bool first_last_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* } // used to keep the timestamp for comparison - pCtx->param[1].nType = 0; - pCtx->param[1].i = 0; +// pCtx->param[1].param.nType = 0; +// pCtx->param[1].param.i = 0; return true; } @@ -1487,13 +1365,13 @@ static void first_dist_func_merge(SqlFunctionCtx *pCtx) { } // The param[1] is used to keep the initial value of max ts value - if (pCtx->param[1].nType != pCtx->resDataInfo.type || pCtx->param[1].i > pInput->ts) { - memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes); - pCtx->param[1].i = pInput->ts; - pCtx->param[1].nType = pCtx->resDataInfo.type; - -// DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); - } +// if (pCtx->param[1].param.nType != pCtx->resDataInfo.type || pCtx->param[1].param.i > pInput->ts) { +// memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes); +// pCtx->param[1].param.i = pInput->ts; +// pCtx->param[1].param.nType = pCtx->resDataInfo.type; +// +//// DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); +// } SET_VAL(pCtx, 1, 1); // GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; @@ -1508,9 +1386,9 @@ static void first_dist_func_merge(SqlFunctionCtx *pCtx) { * least one data in this block that is not null.(TODO opt for this case) */ static void last_function(SqlFunctionCtx *pCtx) { - if (pCtx->order != pCtx->param[0].i) { - return; - } +// if (pCtx->order != pCtx->param[0].param.i) { +// return; +// } SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -1582,9 +1460,9 @@ static void last_dist_function(SqlFunctionCtx *pCtx) { * 1. for scan data is not the required order * 2. for data blocks that are not loaded, no need to check data */ - if (pCtx->order != pCtx->param[0].i) { - return; - } +// if (pCtx->order != pCtx->param[0].param.i) { +// return; +// } int32_t notNullElems = 0; for (int32_t i = pCtx->size - 1; i >= 0; --i) { @@ -1624,10 +1502,10 @@ static void last_dist_func_merge(SqlFunctionCtx *pCtx) { * param[1] used to keep the corresponding timestamp to decide if current result is * the true last result */ - if (pCtx->param[1].nType != pCtx->resDataInfo.type || pCtx->param[1].i < pInput->ts) { + if (pCtx->param[1].param.nType != pCtx->resDataInfo.type || pCtx->param[1].param.i < pInput->ts) { memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes); - pCtx->param[1].i = pInput->ts; - pCtx->param[1].nType = pCtx->resDataInfo.type; + pCtx->param[1].param.i = pInput->ts; + pCtx->param[1].param.nType = pCtx->resDataInfo.type; // DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); } @@ -1955,11 +1833,11 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SqlFunctionCtx *pCtx) { char *tmp = (char *)pTopBotInfo + sizeof(STopBotInfo); pTopBotInfo->res = (tValuePair**) tmp; - tmp += POINTER_BYTES * pCtx->param[0].i; +// tmp += POINTER_BYTES * pCtx->param[0].param.i; // size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; -// for (int32_t i = 0; i < pCtx->param[0].i; ++i) { +// for (int32_t i = 0; i < pCtx->param[0].param.i; ++i) { // pTopBotInfo->res[i] = (tValuePair*) tmp; // pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair); // tmp += size; @@ -1975,13 +1853,13 @@ bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const cha STopBotInfo *pTopBotInfo = getTopBotOutputInfo(pCtx); // required number of results are not reached, continue load data block - if (pTopBotInfo->num < pCtx->param[0].i) { - return true; - } +// if (pTopBotInfo->num < pCtx->param[0].param.i) { +// return true; +// } - if ((void *)pTopBotInfo->res[0] != (void *)((char *)pTopBotInfo + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i)) { - buildTopBotStruct(pTopBotInfo, pCtx); - } +// if ((void *)pTopBotInfo->res[0] != (void *)((char *)pTopBotInfo + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param.i)) { +// buildTopBotStruct(pTopBotInfo, pCtx); +// } tValuePair **pRes = (tValuePair**) pTopBotInfo->res; @@ -2038,9 +1916,9 @@ static void top_function(SqlFunctionCtx *pCtx) { STopBotInfo *pRes = getTopBotOutputInfo(pCtx); assert(pRes->num >= 0); - if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i)) { - buildTopBotStruct(pRes, pCtx); - } +// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param.i)) { +// buildTopBotStruct(pRes, pCtx); +// } for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_DATA(pCtx, i); @@ -2052,7 +1930,7 @@ static void top_function(SqlFunctionCtx *pCtx) { // NOTE: Set the default timestamp if it is missing [todo refactor] TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; -// do_top_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); +// do_top_function_add(pRes, (int32_t)pCtx->param[0].param.i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); } if (!pCtx->hasNull) { @@ -2079,7 +1957,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type; -// do_top_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, +// do_top_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, // type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } @@ -2096,9 +1974,9 @@ static void bottom_function(SqlFunctionCtx *pCtx) { STopBotInfo *pRes = getTopBotOutputInfo(pCtx); - if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i)) { - buildTopBotStruct(pRes, pCtx); - } +// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param.i)) { +// buildTopBotStruct(pRes, pCtx); +// } for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_DATA(pCtx, i); @@ -2109,7 +1987,7 @@ static void bottom_function(SqlFunctionCtx *pCtx) { notNullElems++; // NOTE: Set the default timestamp if it is missing [todo refactor] TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; -// do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); +// do_bottom_function_add(pRes, (int32_t)pCtx->param[0].param.i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); } if (!pCtx->hasNull) { @@ -2136,7 +2014,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type; -// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, +// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, // &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } @@ -2162,11 +2040,11 @@ static void top_bottom_func_finalizer(SqlFunctionCtx *pCtx) { tValuePair **tvp = pRes->res; // user specify the order of output by sort the result according to timestamp - if (pCtx->param[1].i == PRIMARYKEY_TIMESTAMP_COL_ID) { - __compar_fn_t comparator = (pCtx->param[2].i == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; + if (pCtx->param[1].param.i == PRIMARYKEY_TIMESTAMP_COL_ID) { + __compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); - } else /*if (pCtx->param[1].i > PRIMARYKEY_TIMESTAMP_COL_ID)*/ { - __compar_fn_t comparator = (pCtx->param[2].i == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; + } else /*if (pCtx->param[1].param.i > PRIMARYKEY_TIMESTAMP_COL_ID)*/ { + __compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); } @@ -2277,7 +2155,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) { } static void percentile_finalizer(SqlFunctionCtx *pCtx) { - double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i : pCtx->param[0].d; +// double v = pCtx->param[0].param.nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].param.i : pCtx->param[0].param.d; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo); @@ -2287,7 +2165,7 @@ static void percentile_finalizer(SqlFunctionCtx *pCtx) { assert(ppInfo->numOfElems == 0); setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } else { - SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v)); +// SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v)); } tMemBucketDestroy(pMemBucket); @@ -2389,7 +2267,7 @@ static void apercentile_func_merge(SqlFunctionCtx *pCtx) { } static void apercentile_finalizer(SqlFunctionCtx *pCtx) { - double v = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i : pCtx->param[0].d; + double v = (pCtx->param[0].param.nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].param.i : pCtx->param[0].param.d; SResultRowEntryInfo * pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); @@ -2432,7 +2310,7 @@ static bool leastsquares_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInf SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // 2*3 matrix - pInfo->startVal = pCtx->param[0].d; +// pInfo->startVal = pCtx->param[0].param.d; return true; } @@ -2478,54 +2356,54 @@ static void leastsquares_function(SqlFunctionCtx *pCtx) { param[0][2] += x * p[i]; param[1][2] += p[i]; - x += pCtx->param[1].d; + x += pCtx->param[1].param.d; numOfElem++; } break; } case TSDB_DATA_TYPE_BIGINT: { int64_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_DOUBLE: { double *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_FLOAT: { float *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; }; case TSDB_DATA_TYPE_SMALLINT: { int16_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_TINYINT: { int8_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_UTINYINT: { uint8_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_USMALLINT: { uint16_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_UINT: { uint32_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } case TSDB_DATA_TYPE_UBIGINT: { uint64_t *p = pData; - LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].d); + LEASTSQR_CAL_LOOP(pCtx, param, x, p, pCtx->inputType, numOfElem, pCtx->param[1].param.d); break; } } @@ -2584,16 +2462,16 @@ static void col_project_function(SqlFunctionCtx *pCtx) { } // only one row is required. - if (pCtx->param[0].i == 1) { - SET_VAL(pCtx, pCtx->size, 1); - } else { - INC_INIT_VAL(pCtx, pCtx->size); - } +// if (pCtx->param[0].param.i == 1) { +// SET_VAL(pCtx, pCtx->size, 1); +// } else { +// INC_INIT_VAL(pCtx, pCtx->size); +// } char *pData = GET_INPUT_DATA_LIST(pCtx); if (pCtx->order == TSDB_ORDER_ASC) { - int32_t numOfRows = (pCtx->param[0].i == 1)? 1:pCtx->size; - memcpy(pCtx->pOutput, pData, (size_t) numOfRows * pCtx->inputBytes); +// int32_t numOfRows = (pCtx->param[0].param.i == 1)? 1:pCtx->size; +// memcpy(pCtx->pOutput, pData, (size_t) numOfRows * pCtx->inputBytes); } else { for(int32_t i = 0; i < pCtx->size; ++i) { memcpy(pCtx->pOutput + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes, @@ -2658,7 +2536,7 @@ static bool diff_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResI } // diff function require the value is set to -1 - pCtx->param[1].nType = INITIAL_VALUE_NOT_ASSIGNED; + pCtx->param[1].param.nType = INITIAL_VALUE_NOT_ASSIGNED; return false; } @@ -2670,9 +2548,9 @@ static bool deriv_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRes // diff function require the value is set to -1 SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResultInfo); - pDerivInfo->ignoreNegative = pCtx->param[1].i; +// pDerivInfo->ignoreNegative = pCtx->param[1].param.i; pDerivInfo->prevTs = -1; - pDerivInfo->tsWindow = pCtx->param[0].i; +// pDerivInfo->tsWindow = pCtx->param[0].param.i; pDerivInfo->valueSet = false; return false; } @@ -2861,12 +2739,12 @@ static void deriv_function(SqlFunctionCtx *pCtx) { #define DIFF_IMPL(ctx, d, type) \ do { \ - if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ - (ctx)->param[1].nType = (ctx)->inputType; \ - *(type *)&(ctx)->param[1].i = *(type *)(d); \ + if ((ctx)->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED) { \ + (ctx)->param[1].param.nType = (ctx)->inputType; \ + *(type *)&(ctx)->param[1].param.i = *(type *)(d); \ } else { \ - *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \ - *(type *)(&(ctx)->param[1].i) = *(type *)(d); \ + *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].param.i)); \ + *(type *)(&(ctx)->param[1].param.i) = *(type *)(d); \ *(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \ } \ } while (0); @@ -2874,7 +2752,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) { // TODO difference in date column static void diff_function(SqlFunctionCtx *pCtx) { void *data = GET_INPUT_DATA_LIST(pCtx); - bool isFirstBlock = (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED); + bool isFirstBlock = (pCtx->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED); int32_t notNullElems = 0; @@ -2894,15 +2772,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int32_t)(pData[i] - pCtx->param[1].i); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int32_t)(pData[i] - pCtx->param[1].param.i); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.i = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -2916,15 +2794,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = pData[i] - pCtx->param[1].i; // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = pData[i] - pCtx->param[1].param.i; // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.i = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -2938,15 +2816,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].d); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].param.d); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].d = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.d = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -2960,15 +2838,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (float)(pData[i] - pCtx->param[1].d); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (float)(pData[i] - pCtx->param[1].param.d); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].d = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.d = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -2982,15 +2860,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int16_t)(pData[i] - pCtx->param[1].i); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int16_t)(pData[i] - pCtx->param[1].param.i); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.i = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -3005,15 +2883,15 @@ static void diff_function(SqlFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int8_t)(pData[i] - pCtx->param[1].i); // direct previous may be null + if (pCtx->param[1].param.nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int8_t)(pData[i] - pCtx->param[1].param.i); // direct previous may be null *pTimestamp = (tsList != NULL)? tsList[i]:0; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i = pData[i]; - pCtx->param[1].nType = pCtx->inputType; + pCtx->param[1].param.i = pData[i]; + pCtx->param[1].param.nType = pCtx->inputType; notNullElems++; } break; @@ -3024,7 +2902,7 @@ static void diff_function(SqlFunctionCtx *pCtx) { } // initial value is not set yet - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED || notNullElems <= 0) { + if (pCtx->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED || notNullElems <= 0) { /* * 1. current block and blocks before are full of null * 2. current block may be null value @@ -3091,8 +2969,8 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe // this is the server-side setup function in client-side, the secondary merge do not need this procedure if (pCtx->currentStage == MERGE_STAGE) { - pCtx->param[0].d = DBL_MAX; - pCtx->param[3].d = -DBL_MAX; +// pCtx->param[0].param.d = DBL_MAX; +// pCtx->param[3].param.d = -DBL_MAX; } else { pInfo->min = DBL_MAX; pInfo->max = -DBL_MAX; @@ -3192,13 +3070,13 @@ void spread_func_merge(SqlFunctionCtx *pCtx) { return; } - if (pCtx->param[0].d > pData->min) { - pCtx->param[0].d = pData->min; - } +// if (pCtx->param[0].param.d > pData->min) { +// pCtx->param[0].param.d = pData->min; +// } - if (pCtx->param[3].d < pData->max) { - pCtx->param[3].d = pData->max; - } +// if (pCtx->param[3].param.d < pData->max) { +// pCtx->param[3].param.d = pData->max; +// } // GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; } @@ -3218,7 +3096,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) { // return; // } - SET_DOUBLE_VAL((double *)pCtx->pOutput, pCtx->param[3].d - pCtx->param[0].d); +// SET_DOUBLE_VAL((double *)pCtx->pOutput, pCtx->param[3].param.d - pCtx->param[0].param.d); } else { assert(IS_NUMERIC_TYPE(pCtx->inputType) || (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP)); @@ -3571,7 +3449,7 @@ void twa_function_finalizer(SqlFunctionCtx *pCtx) { */ static void interp_function_impl(SqlFunctionCtx *pCtx) { - int32_t type = (int32_t) pCtx->param[2].i; + int32_t type = (int32_t) pCtx->param[2].param.i; if (type == TSDB_FILL_NONE) { return; } @@ -3583,7 +3461,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) { } else if (type == TSDB_FILL_NULL) { setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } else if (type == TSDB_FILL_SET_VALUE) { - taosVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); +// taosVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); } else { if (pCtx->start.key != INT64_MIN && ((ascQuery && pCtx->start.key <= pCtx->startTs && pCtx->end.key >= pCtx->startTs) || ((!ascQuery) && pCtx->start.key >= pCtx->startTs && pCtx->end.key <= pCtx->startTs))) { if (type == TSDB_FILL_PREV) { @@ -3755,11 +3633,11 @@ static void ts_comp_function(SqlFunctionCtx *pCtx) { // primary ts must be existed, so no need to check its existance if (pCtx->order == TSDB_ORDER_ASC) { - tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); +// tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].param.i, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); } else { for (int32_t i = pCtx->size - 1; i >= 0; --i) { char *d = GET_INPUT_DATA(pCtx, i); - tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i, &pCtx->tag, d, (int32_t)TSDB_KEYSIZE); +// tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].param.i, &pCtx->tag, d, (int32_t)TSDB_KEYSIZE); } } @@ -3911,7 +3789,7 @@ static void rate_finalizer(SqlFunctionCtx *pCtx) { return; } - SET_DOUBLE_VAL((double*) pCtx->pOutput, do_calc_rate(pRateInfo, (double) TSDB_TICK_PER_SECOND(pCtx->param[0].i))); +// SET_DOUBLE_VAL((double*) pCtx->pOutput, do_calc_rate(pRateInfo, (double) TSDB_TICK_PER_SECOND(pCtx->param[0].param.i))); // cannot set the numOfIteratedElems again since it is set during previous iteration pResInfo->numOfRes = 1; @@ -4008,7 +3886,7 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) { int32_t len = *(int32_t*) pCtx->pInput; blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist); - pDist->rowSize = (uint16_t)pCtx->param[0].i; +// pDist->rowSize = (uint16_t)pCtx->param[0].param.i; memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len); @@ -4160,7 +4038,7 @@ void blockinfo_func_finalizer(SqlFunctionCtx* pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo); - pDist->rowSize = (uint16_t)pCtx->param[0].i; +// pDist->rowSize = (uint16_t)pCtx->param[0].param.i; generateBlockDistResult(pDist, pCtx->pOutput); if (pDist->dataBlockInfos != NULL) { @@ -4557,9 +4435,9 @@ SAggFunctionInfo aggFunc[35] = {{ FUNCTION_AVG, FUNCSTATE_SO | FUNCSTATE_STABLE, function_setup, - stddev_dst_function, - stddev_dst_finalizer, - stddev_dst_merge, + NULL, + NULL, + NULL, dataBlockRequired, }, { diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index ad74daddc67cd6aca2b780c961456b2b1d266480..e31a860e85fb8a8ec6c58e605853eb0183f27e2d 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -14,19 +14,15 @@ */ #include "uv.h" #include "os.h" -#include "tlog.h" #include "tudf.h" #include "tudfInt.h" #include "tarray.h" #include "tdatablock.h" -//TODO: when startup, set thread poll size. add it to cfg -//TODO: test for udfd restart -//TODO: udfd restart when exist or aborts -//TODO: deal with uv task that has been started and then udfd core dumped //TODO: network error processing. //TODO: add unit test //TODO: include all global variable under context struct + /* Copyright (c) 2013, Ben Noordhuis * The QUEUE is copied from queue.h under libuv * */ @@ -125,12 +121,35 @@ enum { UV_TASK_DISCONNECT = 2 }; +int64_t gUdfTaskSeqNum = 0; +typedef struct SUdfdProxy { + int32_t dnodeId; + uv_barrier_t gUdfInitBarrier; + + uv_loop_t gUdfdLoop; + uv_thread_t gUdfLoopThread; + uv_async_t gUdfLoopTaskAync; + + uv_async_t gUdfLoopStopAsync; + + uv_mutex_t gUdfTaskQueueMutex; + int8_t gUdfcState; + QUEUE gUdfTaskQueue; + QUEUE gUvProcTaskQueue; + // int8_t gUdfcState = UDFC_STATE_INITAL; + // QUEUE gUdfTaskQueue = {0}; + // QUEUE gUvProcTaskQueue = {0}; +} SUdfdProxy; + + typedef struct SUdfUvSession { + SUdfdProxy *udfc; int64_t severHandle; uv_pipe_t *udfSvcPipe; } SUdfUvSession; typedef struct SClientUvTaskNode { + SUdfdProxy *udfc; int8_t type; int errCode; @@ -169,7 +188,6 @@ typedef struct SClientUdfTask { } _teardown; }; - } SClientUdfTask; typedef struct SClientConnBuf { @@ -185,34 +203,13 @@ typedef struct SClientUvConn { SClientConnBuf readBuf; } SClientUvConn; -uv_process_t gUdfdProcess; - -uv_barrier_t gUdfInitBarrier; - -uv_loop_t gUdfdLoop; -uv_thread_t gUdfLoopThread; -uv_async_t gUdfLoopTaskAync; - -uv_async_t gUdfLoopStopAsync; - -uv_mutex_t gUdfTaskQueueMutex; -int64_t gUdfTaskSeqNum = 0; - enum { UDFC_STATE_INITAL = 0, // initial state - UDFC_STATE_STARTNG, // starting after createUdfdProxy + UDFC_STATE_STARTNG, // starting after udfcOpen UDFC_STATE_READY, // started and begin to receive quests - UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart. - UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy + UDFC_STATE_STOPPING, // stopping after udfcClose UDFC_STATUS_FINAL, // stopped }; -int8_t gUdfcState = UDFC_STATE_INITAL; - -//double circular linked list - -QUEUE gUdfTaskQueue = {0}; - -QUEUE gUvProcTaskQueue = {0}; int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t len = 0; @@ -777,13 +774,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) { int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); uvTask->type = uvTaskType; + uvTask->udfc = task->session->udfc; if (uvTaskType == UV_TASK_CONNECT) { } else if (uvTaskType == UV_TASK_REQ_RSP) { uvTask->pipe = task->session->udfSvcPipe; SUdfRequest request; request.type = task->type; - request.seqNum = gUdfTaskSeqNum++; + request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1); if (task->type == UDF_TASK_SETUP) { request.setup = task->_setup.req; @@ -815,11 +813,11 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { debugPrint("%s, %d", "queue uv task", uvTask->type); - - uv_mutex_lock(&gUdfTaskQueueMutex); - QUEUE_INSERT_TAIL(&gUdfTaskQueue, &uvTask->recvTaskQueue); - uv_mutex_unlock(&gUdfTaskQueueMutex); - uv_async_send(&gUdfLoopTaskAync); + SUdfdProxy *udfc = uvTask->udfc; + uv_mutex_lock(&udfc->gUdfTaskQueueMutex); + QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue); + uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); + uv_async_send(&udfc->gUdfLoopTaskAync); uv_sem_wait(&uvTask->taskSem); uv_sem_destroy(&uvTask->taskSem); @@ -832,7 +830,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); - uv_pipe_init(&gUdfdLoop, pipe, 0); + uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0); uvTask->pipe = pipe; SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn)); @@ -873,142 +871,98 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { } void udfClientAsyncCb(uv_async_t *async) { + SUdfdProxy *udfc = async->data; QUEUE wq; - uv_mutex_lock(&gUdfTaskQueueMutex); - QUEUE_MOVE(&gUdfTaskQueue, &wq); - uv_mutex_unlock(&gUdfTaskQueueMutex); + uv_mutex_lock(&udfc->gUdfTaskQueueMutex); + QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq); + uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); while (!QUEUE_EMPTY(&wq)) { QUEUE* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); startUvUdfTask(task); - QUEUE_INSERT_TAIL(&gUvProcTaskQueue, &task->procTaskQueue); + QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue); } } -void cleanUpUvTasks() { +void cleanUpUvTasks(SUdfdProxy *udfc) { QUEUE wq; - uv_mutex_lock(&gUdfTaskQueueMutex); - QUEUE_MOVE(&gUdfTaskQueue, &wq); - uv_mutex_unlock(&gUdfTaskQueueMutex); + uv_mutex_lock(&udfc->gUdfTaskQueueMutex); + QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq); + uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); while (!QUEUE_EMPTY(&wq)) { QUEUE* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); - if (gUdfcState == UDFC_STATE_STOPPING) { + if (udfc->gUdfcState == UDFC_STATE_STOPPING) { task->errCode = UDFC_CODE_STOPPING; - } else if (gUdfcState == UDFC_STATE_RESTARTING) { - task->errCode = UDFC_CODE_RESTARTING; } uv_sem_post(&task->taskSem); } // TODO: deal with tasks that are waiting result. - while (!QUEUE_EMPTY(&gUvProcTaskQueue)) { - QUEUE* h = QUEUE_HEAD(&gUvProcTaskQueue); + while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) { + QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); - if (gUdfcState == UDFC_STATE_STOPPING) { + if (udfc->gUdfcState == UDFC_STATE_STOPPING) { task->errCode = UDFC_CODE_STOPPING; - } else if (gUdfcState == UDFC_STATE_RESTARTING) { - task->errCode = UDFC_CODE_RESTARTING; } uv_sem_post(&task->taskSem); } } void udfStopAsyncCb(uv_async_t *async) { - cleanUpUvTasks(); - if (gUdfcState == UDFC_STATE_STOPPING) { - uv_stop(&gUdfdLoop); + SUdfdProxy *udfc = async->data; + cleanUpUvTasks(udfc); + if (udfc->gUdfcState == UDFC_STATE_STOPPING) { + uv_stop(&udfc->gUdfdLoop); } } -int32_t udfcSpawnUdfd(); - -void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { - //TODO: pipe close will be first received - debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); - uv_close((uv_handle_t *) req, NULL); - //TODO: restart the udfd process - if (gUdfcState == UDFC_STATE_STOPPING) { - if (term_signal != SIGINT) { - //TODO: log error - } - } - if (gUdfcState == UDFC_STATE_READY) { - gUdfcState = UDFC_STATE_RESTARTING; - //TODO: asynchronous without blocking. how to do it - //cleanUpUvTasks(); - udfcSpawnUdfd(); - } -} - -int32_t udfcSpawnUdfd() { - //TODO: path - uv_process_options_t options = {0}; - static char path[256] = {0}; - size_t cwdSize; - uv_cwd(path, &cwdSize); - strcat(path, "/udfd"); - char* args[2] = {path, NULL}; - options.args = args; - options.file = path; - options.exit_cb = onUdfdExit; - options.stdio_count = 3; - uv_stdio_container_t child_stdio[3]; - child_stdio[0].flags = UV_IGNORE; - child_stdio[1].flags = UV_INHERIT_FD; - child_stdio[1].data.fd = 1; - child_stdio[2].flags = UV_INHERIT_FD; - child_stdio[2].data.fd = 2; - options.stdio = child_stdio; - //TODO spawn error - int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options); - if (err != 0) { - debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); - } - return err; -} - void constructUdfService(void *argsThread) { - uv_loop_init(&gUdfdLoop); - - uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); - uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); - uv_mutex_init(&gUdfTaskQueueMutex); - QUEUE_INIT(&gUdfTaskQueue); - QUEUE_INIT(&gUvProcTaskQueue); - uv_barrier_wait(&gUdfInitBarrier); + SUdfdProxy *udfc = (SUdfdProxy*)argsThread; + uv_loop_init(&udfc->gUdfdLoop); + + uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb); + udfc->gUdfLoopTaskAync.data = udfc; + uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb); + udfc->gUdfLoopStopAsync.data = udfc; + uv_mutex_init(&udfc->gUdfTaskQueueMutex); + QUEUE_INIT(&udfc->gUdfTaskQueue); + QUEUE_INIT(&udfc->gUvProcTaskQueue); + uv_barrier_wait(&udfc->gUdfInitBarrier); //TODO return value of uv_run - uv_run(&gUdfdLoop, UV_RUN_DEFAULT); - uv_loop_close(&gUdfdLoop); + uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT); + uv_loop_close(&udfc->gUdfdLoop); } - -int32_t createUdfdProxy(int32_t dnodeId) { - gUdfcState = UDFC_STATE_STARTNG; - uv_barrier_init(&gUdfInitBarrier, 2); - uv_thread_create(&gUdfLoopThread, constructUdfService, 0); - uv_barrier_wait(&gUdfInitBarrier); gUdfcState = UDFC_STATE_READY; +int32_t udfcOpen(int32_t dnodeId, UdfcHandle *udfc) { + SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy)); + proxy->dnodeId = dnodeId; + proxy->gUdfcState = UDFC_STATE_STARTNG; + uv_barrier_init(&proxy->gUdfInitBarrier, 2); + uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); + uv_barrier_wait(&proxy->gUdfInitBarrier); + proxy->gUdfcState = UDFC_STATE_READY; + *udfc = proxy; return 0; } -int32_t destroyUdfdProxy(int32_t dnodeId) { - gUdfcState = UDFC_STATE_STOPPING; - uv_barrier_destroy(&gUdfInitBarrier); -// if (gUdfcState == UDFC_STATE_STOPPING) { -// uv_process_kill(&gUdfdProcess, SIGINT); -// } - uv_async_send(&gUdfLoopStopAsync); - uv_thread_join(&gUdfLoopThread); - uv_mutex_destroy(&gUdfTaskQueueMutex); - gUdfcState = UDFC_STATUS_FINAL; +int32_t udfcClose(UdfcHandle udfcHandle) { + SUdfdProxy *udfc = udfcHandle; + udfc->gUdfcState = UDFC_STATE_STOPPING; + uv_async_send(&udfc->gUdfLoopStopAsync); + uv_thread_join(&udfc->gUdfLoopThread); + uv_mutex_destroy(&udfc->gUdfTaskQueueMutex); + uv_barrier_destroy(&udfc->gUdfInitBarrier); + udfc->gUdfcState = UDFC_STATUS_FINAL; + taosMemoryFree(udfc); return 0; } @@ -1026,11 +980,12 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { return task->errCode; } -int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { +int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle *funcHandle) { debugPrint("%s", "client setup udf"); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); task->errCode = 0; task->session = taosMemoryMalloc(sizeof(SUdfUvSession)); + task->session->udfc = udfc; task->type = UDF_TASK_SETUP; SUdfSetupRequest *req = &task->_setup.req; @@ -1046,13 +1001,13 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { SUdfSetupResponse *rsp = &task->_setup.rsp; task->session->severHandle = rsp->udfHandle; - *handle = task->session; + *funcHandle = task->session; int32_t err = task->errCode; taosMemoryFree(task); return err; } -int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, +int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, SSDataBlock* output, SUdfInterBuf *newState) { debugPrint("%s", "client call udf"); @@ -1121,7 +1076,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter } //TODO: translate these calls to callUdf -int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { +int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { int8_t callType = TSDB_UDF_CALL_AGG_INIT; int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf); @@ -1131,7 +1086,7 @@ int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { // input: block, state // output: interbuf, -int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { +int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { int8_t callType = TSDB_UDF_CALL_AGG_PROC; int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState); return err; @@ -1139,7 +1094,7 @@ int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *st // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { +int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { int8_t callType = TSDB_UDF_CALL_AGG_MERGE; int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); return err; @@ -1147,7 +1102,7 @@ int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf // input: interBuf // output: resultData -int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { +int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { int8_t callType = TSDB_UDF_CALL_AGG_PROC; int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); return err; @@ -1155,13 +1110,13 @@ int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBu // input: block // output: resultData -int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData) { +int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); return err; } -int32_t teardownUdf(UdfHandle handle) { +int32_t teardownUdf(UdfcFuncHandle handle) { debugPrint("%s", "client teardown udf"); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 71434c695f49db7339218d40163bd23c97e2ffbf..d6e7a436660d4f4fc5d8dc33fd5de6d7c7eaa9c6 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -27,7 +27,10 @@ typedef struct SUdfdContext { uv_loop_t *loop; + uv_pipe_t ctrlPipe; + uv_signal_t intrSignal; char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; + uv_pipe_t listeningPipe; void *clientRpc; uv_mutex_t udfsMutex; @@ -76,9 +79,9 @@ typedef struct SUdf { // TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix // TODO: add private udf structure. -typedef struct SUdfHandle { +typedef struct SUdfcFuncHandle { SUdf *udf; -} SUdfHandle; +} SUdfcFuncHandle; int32_t udfdLoadUdf(char* udfName, SUdf* udf) { strcpy(udf->name, udfName); @@ -143,7 +146,7 @@ void udfdProcessRequest(uv_work_t *req) { } uv_mutex_unlock(&udf->lock); } - SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); + SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); handle->udf = udf; // TODO: allocate private structure and call init function and set it to handle SUdfResponse rsp; @@ -166,7 +169,7 @@ void udfdProcessRequest(uv_work_t *req) { case UDF_TASK_CALL: { SUdfCallRequest *call = &request.call; fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle); - SUdfHandle *handle = (SUdfHandle *)(call->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); SUdf *udf = handle->udf; SUdfDataBlock input = {0}; @@ -204,7 +207,7 @@ void udfdProcessRequest(uv_work_t *req) { case UDF_TASK_TEARDOWN: { SUdfTeardownRequest *teardown = &request.teardown; fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle) - SUdfHandle *handle = (SUdfHandle *)(teardown->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); SUdf *udf = handle->udf; bool unloadUdf = false; uv_mutex_lock(&global.udfsMutex); @@ -380,10 +383,12 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { } } -void removeListeningPipe(int sig) { +void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { + fnInfo("udfd signal received: %d\n", signum); uv_fs_t req; - uv_fs_unlink(global.loop, &req, "udf.sock", NULL); - exit(0); + uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + uv_signal_stop(handle); + uv_stop(global.loop); } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } @@ -492,37 +497,67 @@ static int32_t udfdInitLog() { return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0); } +void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + buf->base = taosMemoryMalloc(suggested_size); + buf->len = suggested_size; +} + +void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { + if (nread < 0) { + fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); + uv_close((uv_handle_t*)q, NULL); + uv_stop(global.loop); + return; + } + fnError("udfd ctrl pipe read %zu bytes", nread); + taosMemoryFree(buf->base); +} + +static int32_t removeListeningPipe() { + uv_fs_t req; + int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + uv_fs_req_cleanup(&req); + return err; +} + static int32_t udfdUvInit() { uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (loop) { uv_loop_init(loop); } global.loop = loop; + + uv_pipe_init(global.loop, &global.ctrlPipe, 1); + uv_pipe_open(&global.ctrlPipe, 0); + uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); + char dnodeId[8] = {0}; size_t dnodeIdSize; - uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); + int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); + if (err != 0) { + dnodeId[0] = '1'; + } char listenPipeName[32] = {0}; snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); strcpy(global.listenPipeName, listenPipeName); - uv_fs_t req; - uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + removeListeningPipe(); - uv_pipe_t server; - uv_pipe_init(global.loop, &server, 0); + uv_pipe_init(global.loop, &global.listeningPipe, 0); - signal(SIGINT, removeListeningPipe); + uv_signal_init(global.loop, &global.intrSignal); + uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT); int r; fnInfo("bind to pipe %s", global.listenPipeName); - if ((r = uv_pipe_bind(&server, listenPipeName))) { + if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) { fnError("Bind error %s", uv_err_name(r)); - removeListeningPipe(0); + removeListeningPipe(); return -1; } - if ((r = uv_listen((uv_stream_t *)&server, 128, udfdOnNewConnection))) { + if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) { fnError("Listen error %s", uv_err_name(r)); - removeListeningPipe(0); + removeListeningPipe(); return -2; } return 0; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 41c7f65e0bbf507fd595d94763232aa721dca288..fb9c3c678af8dd793aea5da5f99f70c482cadedf 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -8,7 +8,8 @@ #include "tdatablock.h" int main(int argc, char *argv[]) { - createUdfdProxy(1); + UdfcHandle udfc; + udfcOpen(1, &udfc); uv_sleep(1000); char path[256] = {0}; size_t cwdSize = 256; @@ -20,9 +21,9 @@ int main(int argc, char *argv[]) { fprintf(stdout, "current working directory:%s\n", path); strcat(path, "/libudf1.so"); - UdfHandle handle; + UdfcFuncHandle handle; SEpSet epSet; - setupUdf("udf1", &epSet, &handle); + setupUdf(udfc, "udf1", &epSet, &handle); SSDataBlock block = {0}; SSDataBlock* pBlock = █ @@ -53,5 +54,5 @@ int main(int argc, char *argv[]) { } teardownUdf(handle); - destroyUdfdProxy(1); + udfcClose(udfc); } diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index ad50948a021ac663bcc012ff6cbf08f86a354e4e..eaca9b0fc765ddee699db235733d9508dd45dc36 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,13 +63,14 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); + int (*retry)(void* parent, SRpcMsg*, SEpSet*); - int32_t refCount; - void* parent; - void* idPool; // handle to ID pool - void* tmrCtrl; // handle to timer - SHashObj* hash; // handle returned by hash utility - void* tcphandle; // returned handle from TCP initialization + int32_t refCount; + void* parent; + void* idPool; // handle to ID pool + void* tmrCtrl; // handle to timer + SHashObj* hash; // handle returned by hash utility + void* tcphandle; // returned handle from TCP initialization TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index ebb90338cd42303243f44be81bceef25f1160b63..fa517d6d61cca34395dd5648a4146a446b9e2e64 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,6 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; + pRpc->retry = pInit->rfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -100,11 +101,10 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.contLen = sizeof(SEpSet); - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - if (rpcMsg.pCont == NULL) return; - - memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet)); + SMEpSet msg = {.epSet = *pEpSet}; + int32_t len = tSerializeSMEpSet(NULL, 0, &msg); + rpcMsg.pCont = rpcMallocCont(len); + tSerializeSMEpSet(rpcMsg.pCont, len, &msg); rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.handle = thandle; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b81310b90bb7fea32d77cdc208747b4e57fddee6..b43b8a1e0c30cabd1eb50e302c604b75ba5436c3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -31,12 +31,8 @@ typedef struct SCliConn { int hThrdIdx; STransCtx ctx; - bool broken; // link broken or not - ConnStatus status; // - int release; // 1: release - // spi configure - char spi; - char secured; + bool broken; // link broken or not + ConnStatus status; // char* ip; uint32_t port; @@ -44,7 +40,6 @@ typedef struct SCliConn { // debug and log info struct sockaddr_in addr; struct sockaddr_in locaddr; - } SCliConn; typedef struct SCliMsg { @@ -102,6 +97,8 @@ static void cliSendCb(uv_write_t* req, int status); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); +static void cliAppCb(SCliConn* pConn, STransMsg* pMsg); + static SCliConn* cliCreateConn(SCliThrdObj* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); @@ -303,8 +300,6 @@ void cliHandleResp(SCliConn* conn) { TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); - conn->secured = pHead->secured; - if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { tTrace("except, server continue send while cli ignore it"); // transUnrefCliHandle(conn); @@ -318,7 +313,8 @@ void cliHandleResp(SCliConn* conn) { if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); - (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + cliAppCb(conn, &transMsg); + //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else { tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); @@ -384,7 +380,8 @@ void cliHandleExcept(SCliConn* pConn) { once = true; continue; } - (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + cliAppCb(pConn, &transMsg); + //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else { tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn); memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); @@ -884,6 +881,18 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } +void cliAppCb(SCliConn* pConn, STransMsg* transMsg) { + SCliThrdObj* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) { + SMEpSet emsg = {0}; + tDeserializeSMEpSet(transMsg->pCont, transMsg->contLen, &emsg); + pTransInst->retry(pTransInst, transMsg, &(emsg.epSet)); + } else { + pTransInst->cfp(pTransInst->parent, transMsg, NULL); + } +} void transCloseClient(void* arg) { SCliObj* cli = arg; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index ec66f3e8df46a2ca0d8416650433011062c37147..59a30051ef1fb961305836c6771de3d193b2068f 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -30,7 +30,6 @@ typedef struct SSrvConn { uv_timer_t pTimer; queue queue; - int ref; int persist; // persist connection or not SConnBuffer readBuf; // read buf, int inType; @@ -692,8 +691,6 @@ static void uvDestroyConn(uv_handle_t* handle) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { tTrace("work thread quit"); uv_walk(thrd->loop, uvWalkCb, NULL); - // uv_loop_close(thrd->loop); - // uv_stop(thrd->loop); } } @@ -756,8 +753,6 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { uv_walk(thrd->loop, uvWalkCb, NULL); - // uv_loop_close(thrd->loop); - // uv_stop(thrd->loop); } else { destroyAllConn(thrd); } @@ -851,10 +846,8 @@ void transRefSrvHandle(void* handle) { if (handle == NULL) { return; } - SSrvConn* conn = handle; - int ref = T_REF_INC((SSrvConn*)handle); - UNUSED(ref); + tDebug("server conn %p ref count: %d", handle, ref); } void transUnrefSrvHandle(void* handle) { diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 22884298efbcf7015a6426a2fa2827afa87914b1..e24ac41f20b2db48356ac52ca2c1585322fc65aa 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -37,6 +37,7 @@ int64_t tsOpenMax = 0; int64_t tsStreamMax = 0; float tsNumOfCores = 0; int64_t tsTotalMemoryKB = 0; +char* tsProcPath = NULL; void osDefaultInit() { taosSeedRand(taosSafeRand()); diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c index b6de638ac2e4c9fe851d7c73515a9d509ef2ae95..d569582256c5e156e6f21cd0419205705d96475c 100644 --- a/source/os/src/osProc.c +++ b/source/os/src/osProc.c @@ -17,7 +17,7 @@ #define _DEFAULT_SOURCE #include "os.h" -static char *tsProcPath = NULL; +char *tsProcPath = NULL; int32_t taosNewProc(char **args) { int32_t pid = fork(); diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index 48a0f327c4111d89dfd99437fc7c763d2d65b101..8675670cfe1e61da83e79e4e3a0ea665cec597ca 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -229,22 +229,21 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size } void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar, - __ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot) { + __ext_compar_fn_t compar, char* buf, bool maxroot) { int32_t parent; int32_t child; - char *buf; + + char* tmp = NULL; + if (buf == NULL) { + tmp = taosMemoryMalloc(size); + } else { + tmp = buf; + } if (base && size > 0 && compar) { parent = start; child = 2 * parent + 1; - if (swap == NULL) { - buf = taosMemoryCalloc(1, size); - if (buf == NULL) { - return; - } - } - if (maxroot) { while (child <= end) { if (child + 1 <= end && @@ -256,11 +255,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const break; } - if (swap == NULL) { - doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf); - } else { - (*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap); - } + doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp); parent = child; child = 2 * parent + 1; @@ -276,33 +271,35 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const break; } - if (swap == NULL) { - doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf); - } else { - (*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap); - } + doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp); parent = child; child = 2 * parent + 1; } } + } - if (swap == NULL) { - taosMemoryFreeClear(buf); - } + if (buf == NULL) { + taosMemoryFree(tmp); } } void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar, - const void *parswap, __ext_swap_fn_t swap, bool maxroot) { + bool maxroot) { int32_t i; + char* buf = taosMemoryCalloc(1, size); + if (buf == NULL) { + return; + } + if (base && size > 0) { for (i = len / 2 - 1; i >= 0; i--) { - taosheapadjust(base, size, i, len - 1, parcompar, compar, parswap, swap, maxroot); + taosheapadjust(base, size, i, len - 1, parcompar, compar, buf, maxroot); } } + taosMemoryFree(buf); /* char *buf = taosMemoryCalloc(1, size); diff --git a/tests/script/test.sh b/tests/script/test.sh index 14dc43beaf2c104165c954a8aa3c96d7252da03c..e4191da0a91a85584b9aefa21547bcef06e6fb92 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -131,8 +131,8 @@ if [ -n "$FILE_NAME" ]; then FLAG="-v" fi - echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG - valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG + echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --child-silent-after-fork=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG + valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --child-silent-after-fork=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG else if [[ $MULTIPROCESS -eq 1 ]];then echo "ExcuteCmd(multiprocess):" $PROGRAM -m -c $CFG_DIR -f $FILE_NAME diff --git a/tests/script/tsim/query/session.sim b/tests/script/tsim/query/session.sim index d5e77c7b28471fe84ddf6fbe8310ba038f1ac358..a69b6249fc3f0389ca2982d11c9afc1435f7ced6 100644 --- a/tests/script/tsim/query/session.sim +++ b/tests/script/tsim/query/session.sim @@ -279,7 +279,8 @@ endi print ================> syntax error check not active ================> reactive sql_error select * from dev_001 session(ts,1w) -sql_error select count(*) from st session(ts,1w) +print disable this temporarily, session can not be directly applied to super table. +#sql_error select count(*) from st session(ts,1w) sql_error select count(*) from dev_001 group by tagtype session(ts,1w) sql_error sql select count(*) from dev_001 session(ts,1n) sql_error sql select count(*) from dev_001 session(ts,1y) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index ca5bf69795060d84025fb3d90e2b3f118f183846..36d2866fb56c3146543886b02a0307eb5874b83e 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -224,27 +224,63 @@ int32_t shellRunCommand(TAOS *con, char *command) { } } - char quote = 0, *cmd = command; + bool esc = false; + char quote = 0, *cmd = command, *p = command; for (char c = *command++; c != 0; c = *command++) { - if (c == '\\' && (*command == '\'' || *command == '"' || *command == '`')) { - command ++; + if (esc) { + switch (c) { + case 'n': + c = '\n'; + break; + case 'r': + c = '\r'; + break; + case 't': + c = '\t'; + break; + case 'G': + *p++ = '\\'; + break; + case '\'': + case '"': + if (quote) { + *p++ = '\\'; + } + break; + } + *p++ = c; + esc = false; continue; } + if (c == '\\') { + if (quote != 0 && (*command == '_' || *command == '\\')) { + // DO nothing + } else { + esc = true; + continue; + } + } + if (quote == c) { quote = 0; - } else if (quote == 0 && (c == '\'' || c == '"' || c == '`')) { + } else if (quote == 0 && (c == '\'' || c == '"')) { quote = c; - } else if (c == ';' && quote == 0) { - c = *command; - *command = 0; + } + + *p++ = c; + if (c == ';' && quote == 0) { + c = *p; + *p = 0; if (shellRunSingleCommand(con, cmd) < 0) { return -1; } - *command = c; - cmd = command; + *p = c; + p = cmd; } } + + *p = 0; return shellRunSingleCommand(con, cmd); } @@ -538,23 +574,19 @@ static void shellPrintNChar(const char *str, int length, int width) { while (pos < length) { TdWchar wc; int bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); - if (bytes <= 0) { + if (bytes == 0) { break; } - if (pos + bytes > length) { + pos += bytes; + if (pos > length) { break; } - int w = 0; + #ifdef WINDOWS - w = bytes; + int w = bytes; #else - if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){ - w = bytes; - }else{ - w = taosWcharWidth(wc); - } + int w = taosWcharWidth(wc); #endif - pos += bytes; if (w <= 0) { continue; } @@ -648,7 +680,6 @@ static void printField(const char *val, TAOS_FIELD *field, int width, int32_t le break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_JSON: shellPrintNChar(val, length, width); break; case TSDB_DATA_TYPE_TIMESTAMP: @@ -761,8 +792,7 @@ static int calcColWidth(TAOS_FIELD *field, int precision) { return TMAX(field->bytes, width); } - case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_JSON:{ + case TSDB_DATA_TYPE_NCHAR: { int16_t bytes = field->bytes * TSDB_NCHAR_SIZE; if (bytes > tsMaxBinaryDisplayWidth) { return TMAX(tsMaxBinaryDisplayWidth, width);