diff --git a/documentation/webdocs/markdowndocs/Super Table-ch.md b/documentation/webdocs/markdowndocs/Super Table-ch.md index e75a8d46c38e3501b2c17b4a05b68c0e8fa4a707..524fb51b17566955670966e2c51e7cb38e5df67f 100644 --- a/documentation/webdocs/markdowndocs/Super Table-ch.md +++ b/documentation/webdocs/markdowndocs/Super Table-ch.md @@ -72,7 +72,7 @@ STable从属于库,一个STable只属于一个库,但一个库可以有一 DROP TABLE ``` - Note: 删除STable不会级联删除通过STable创建的表;相反删除STable时要求通过该STable创建的表都已经被删除。 + Note: 删除STable时,所有通过该STable创建的表都将被删除。 - 查看属于某STable并满足查询条件的表 diff --git a/documentation/webdocs/markdowndocs/Super Table.md b/documentation/webdocs/markdowndocs/Super Table.md index 609dd11bd278da3398330fa33f857fac65ffb3d5..efc95c5f79216b2a887f26a565b4e5e123768c6b 100644 --- a/documentation/webdocs/markdowndocs/Super Table.md +++ b/documentation/webdocs/markdowndocs/Super Table.md @@ -142,7 +142,7 @@ It lists the STable's schema and tags DROP TABLE ``` -To delete a STable, all the tables created via this STable shall be deleted first, otherwise, it will fail. +To delete a STable, all the tables created via this STable will be deleted first. ### List the Associated Tables of a STable diff --git a/documentation/webdocs/markdowndocs/advanced features-ch.md b/documentation/webdocs/markdowndocs/advanced features-ch.md index 4d01eaf364cfe17b4cb4658dc8596fbb52a65ae2..fc229e71e6c76325938f794ac9fccb542778d2ae 100644 --- a/documentation/webdocs/markdowndocs/advanced features-ch.md +++ b/documentation/webdocs/markdowndocs/advanced features-ch.md @@ -76,7 +76,7 @@ TDengine通过查询函数向用户提供毫秒级的数据获取能力。直接 TDengine分配固定大小的内存空间作为缓存空间,缓存空间可根据应用的需求和硬件资源配置。通过适当的设置缓存空间,TDengine可以提供极高性能的写入和查询的支持。TDengine中每个虚拟节点(virtual node)创建时分配独立的缓存池。每个虚拟节点管理自己的缓存池,不同虚拟节点间不共享缓存池。每个虚拟节点内部所属的全部表共享该虚拟节点的缓存池。 -TDengine将内存池按块划分进行管理,数据在内存块里按照列式存储。一个vnode的内存池是在vnode创建时按块分配好的,而且每个内存块按照先进先出的原则进行管理。一张表所需要的内存块是从vnode的内存池中进行分配的,块的大小由系统配置参数cache决定。每张表最大内存块的数目由配置参数tblocks决定,每张表平均的内存块的个数由配置参数ablocks决定。因此对于一个vnode, 总的内存大小为: cache*ablocks*tables。内存块参数cache不宜过小,一个cache block需要能存储至少几十条以上记录,才会有效率。参数ablocks最小为2,保证每张表平均至少能分配两个内存块。 +TDengine将内存池按块划分进行管理,数据在内存块里按照列式存储。一个vnode的内存池是在vnode创建时按块分配好的,而且每个内存块按照先进先出的原则进行管理。一张表所需要的内存块是从vnode的内存池中进行分配的,块的大小由系统配置参数cache决定。每张表最大内存块的数目由配置参数tblocks决定,每张表平均的内存块的个数由配置参数ablocks决定。因此对于一个vnode, 总的内存大小为: `cache * ablocks * tables`。内存块参数cache不宜过小,一个cache block需要能存储至少几十条以上记录,才会有效率。参数ablocks最小为2,保证每张表平均至少能分配两个内存块。 你可以通过函数last_row快速获取一张表或一张超级表的最后一条记录,这样很便于在大屏显示各设备的实时状态或采集值。例如: diff --git a/minidevops/README.MD b/minidevops/README.MD index f9ec4a8f190071d98d4b17de5b0bf3671bd38d6a..9937ad04ad91a4aacdf41f75294052af5024e4cb 100644 --- a/minidevops/README.MD +++ b/minidevops/README.MD @@ -7,8 +7,10 @@ - grafana/grafana Grafana的镜像,一个广泛应用的开源可视化监控软件 - telegraf:latest 一个广泛应用的开源数据采集程序 - prom/prometheus:latest 一个广泛应用的k8s领域的开源数据采集程序 + ## 说明 本文中的图片链接在Github上显示不出来,建议将MD文件下载后用vscode或其他md文件浏览工具进行查看 + ## 前提条件 1. 一台linux服务器或运行linux操作系统的虚拟机或者运行MacOS的计算机 2. 安装了Docker软件。Docker软件的安装方法请参考linux下安装Docker @@ -216,4 +218,8 @@ use telegraf; 使用telegraf这个数据库。然后执行show tables,describe table等命令详细查询下telegraf这个库里保存了些什么数据。 具体TDengine的查询语句可以参考[TDengine官方文档](https://www.taosdata.com/cn/documentation/taos-sql/) ## 接入多个监控对象 +<<<<<<< HEAD +就像前面原理介绍的,这个miniDevops的小系统,已经提供了一个时序数据库和可视化系统,对于多台机器的监控,只需要将每台机器的telegraf或prometheus配置按上面所述修改,就可以完成监控数据采集和可视化呈现了。 +======= 就像前面原理介绍的,这个miniDevops的小系统,已经提供了一个时序数据库和可视化系统,对于多台机器的监控,只需要将每台机器的telegraf或prometheus配置按上面所述修改,就可以完成监控数据采集和可视化呈现了。 +>>>>>>> 740f82af58c4ecc2deecfa36fb1de4ef5ee55efc diff --git a/packaging/docker/Dockerfile b/packaging/docker/Dockerfile index b01f375db0bc49e07ddb6b9d5e3c1692455857a7..22c1bc190235d1b649469cbf8add9b64e0703d03 100644 --- a/packaging/docker/Dockerfile +++ b/packaging/docker/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /root COPY tdengine.tar.gz /root/ RUN tar -zxf tdengine.tar.gz -WORKDIR /root/tdengine/ +WORKDIR /root/TDengine-server/ RUN sh install.sh diff --git a/packaging/docker/dockerbuild.sh b/packaging/docker/dockerbuild.sh index 280c27d7aa10248a73874518c30639b83203590b..aeea6a2a95aae576bc0af8f36b2bff3b79f17f04 100755 --- a/packaging/docker/dockerbuild.sh +++ b/packaging/docker/dockerbuild.sh @@ -1,12 +1,6 @@ #!/bin/bash set -x $1 -tar -zxf $1 -DIR=`echo $1|awk -F . '{print($1"."$2"."$3"."$4)}'` -mv $DIR tdengine -tar -czf tdengine.tar.gz tdengine -TMP=`echo $1|awk -F . '{print($2"."$3"."$4)}'` -TAG="1."$TMP -docker build --rm -f "Dockerfile" -t tdengine/tdengine:$TAG "." +docker build --rm -f "Dockerfile" -t tdengine/tdengine:$1 "." docker login -u tdengine -p ******** #replace the docker registry username and password -docker push tdengine/tdengine:$TAG \ No newline at end of file +docker push tdengine/tdengine:$1 \ No newline at end of file diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index ed7d4dfc602fde29538183d8f691a24f5d63ec56..837a0ce0054b8fd7cc92a7b164f667cb6841a276 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -138,6 +138,19 @@ typedef struct STSCompInfo { STSBuf *pTSBuf; } STSCompInfo; +typedef struct SRateInfo { + int64_t CorrectionValue; + int64_t firstValue; + TSKEY firstKey; + int64_t lastValue; + TSKEY lastKey; + int8_t hasResult; // flag to denote has value + bool isIRate; // true for IRate functions, false for Rate functions + int64_t num; // for sum/avg + double sum; // for sum/avg +} SRateInfo; + + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) { if (!isValidDataType(dataType, dataBytes)) { @@ -192,7 +205,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SAvgInfo); *intermediateResBytes = *bytes; + return TSDB_CODE_SUCCESS; + } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = sizeof(SRateInfo); + *intermediateResBytes = sizeof(SRateInfo); return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { *type = TSDB_DATA_TYPE_BINARY; @@ -253,6 +271,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); *intermediateResBytes = sizeof(SAvgInfo); + } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = sizeof(double); + *intermediateResBytes = sizeof(SRateInfo); } else if (functionId == TSDB_FUNC_STDDEV) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); @@ -4348,6 +4370,462 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +////////////////////////////////////////////////////////////////////////////////////////////// +// RATE functions + +static double do_calc_rate(const SRateInfo* pRateInfo) { + if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) || (pRateInfo->firstKey >= pRateInfo->lastKey)) { + return 0; + } + + int64_t diff = 0; + + if (pRateInfo->isIRate) { + diff = pRateInfo->lastValue; + if (diff >= pRateInfo->firstValue) { + diff -= pRateInfo->firstValue; + } + } else { + diff = pRateInfo->CorrectionValue + pRateInfo->lastValue - pRateInfo->firstValue; + if (diff <= 0) { + return 0; + } + } + + int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey; + duration = (duration + 500) / 1000; + + double resultVal = ((double)diff) / duration; + + pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " resultVal:%f", + pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal); + + return resultVal; +} + + +static bool rate_function_setup(SQLFunctionCtx *pCtx) { + if (!function_setup(pCtx)) { + return false; + } + + SResultInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; + SRateInfo * pInfo = pResInfo->interResultBuf; + + pInfo->CorrectionValue = 0; + pInfo->firstKey = INT64_MIN; + pInfo->lastKey = INT64_MIN; + pInfo->firstValue = INT64_MIN; + pInfo->lastValue = INT64_MIN; + pInfo->num = 0; + pInfo->sum = 0; + + pInfo->hasResult = 0; + pInfo->isIRate = ((pCtx->functionId == TSDB_FUNC_IRATE) || (pCtx->functionId == TSDB_FUNC_SUM_IRATE) || (pCtx->functionId == TSDB_FUNC_AVG_IRATE)); + return true; +} + + +static void rate_function(SQLFunctionCtx *pCtx) { + + assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus)); + + int32_t notNullElems = 0; + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); + + for (int32_t i = 0; i < pCtx->size; ++i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p rate_function() index of null data:%d", pCtx, i); + continue; + } + + notNullElems++; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; + + pTrace("firstValue:%" PRId64 " firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey); + } + + if (INT64_MIN == pRateInfo->lastValue) { + pRateInfo->lastValue = v; + } else if (v < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + pTrace("CorrectionValue:%" PRId64, pRateInfo->CorrectionValue); + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + pTrace("lastValue:%" PRId64 " lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); + } + + if (!pCtx->hasNull) { + assert(pCtx->size == notNullElems); + } + + SET_VAL(pCtx, notNullElems, 1); + + if (notNullElems > 0) { + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + } + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { + void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + return; + } + + // NOTE: keep the intermediate result into the interResultBuf + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[index]; + } + + if (INT64_MIN == pRateInfo->lastValue) { + pRateInfo->lastValue = v; + } else if (v < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[index]; + + pTrace("====%p rate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " CorrectionValue:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->CorrectionValue); + + SET_VAL(pCtx, 1, 1); + + // set has result flag + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + + + +static void rate_func_merge(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pResInfo->superTableQ); + + pTrace("rate_func_merge() size:%d", pCtx->size); + + //SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SRateInfo *pBuf = (SRateInfo *)pCtx->aOutputBuf; + char *indicator = pCtx->aInputElemBuf; + + assert(1 == pCtx->size); + + int32_t numOfNotNull = 0; + for (int32_t i = 0; i < pCtx->size; ++i, indicator += sizeof(SRateInfo)) { + SRateInfo *pInput = (SRateInfo *)indicator; + if (DATA_SET_FLAG != pInput->hasResult) { + continue; + } + + numOfNotNull++; + memcpy(pBuf, pInput, sizeof(SRateInfo)); + pTrace("%p rate_func_merge() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64, + pCtx, pInput->isIRate, pInput->firstKey, pInput->lastKey, pInput->firstValue, pInput->lastValue, pInput->CorrectionValue); + } + + SET_VAL(pCtx, numOfNotNull, 1); + + if (numOfNotNull > 0) { + pBuf->hasResult = DATA_SET_FLAG; + } + + return; +} + + + +static void rate_func_copy(SQLFunctionCtx *pCtx) { + assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); + + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + memcpy(pResInfo->interResultBuf, pCtx->aInputElemBuf, (size_t)pCtx->inputBytes); + pResInfo->hasResult = ((SRateInfo*)pCtx->aInputElemBuf)->hasResult; + + SRateInfo* pRateInfo = (SRateInfo*)pCtx->aInputElemBuf; + pTrace("%p rate_func_second_merge() firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d", + pCtx, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult); +} + + + +static void rate_finalizer(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + + pTrace("%p isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d", + pCtx, pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult); + + if (pRateInfo->hasResult != DATA_SET_FLAG) { + setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + return; + } + + *(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo); + + pTrace("rate_finalizer() output result:%f", *(double *)pCtx->aOutputBuf); + + // cannot set the numOfIteratedElems again since it is set during previous iteration + pResInfo->numOfRes = 1; + pResInfo->hasResult = DATA_SET_FLAG; + + doFinalizer(pCtx); +} + + +static void irate_function(SQLFunctionCtx *pCtx) { + + assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus)); + + int32_t notNullElems = 0; + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + pTrace("%p irate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); + + if (pCtx->size < 1) { + return; + } + + for (int32_t i = pCtx->size - 1; i >= 0; --i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p irate_function() index of null data:%d", pCtx, i); + continue; + } + + notNullElems++; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + // TODO: calc once if only call this function once ???? + if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->lastValue)) { + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + + pTrace("%p irate_function() lastValue:%" PRId64 " lastKey:%" PRId64, pCtx, pRateInfo->lastValue, pRateInfo->lastKey); + continue; + } + + if ((INT64_MIN == pRateInfo->firstKey) || (INT64_MIN == pRateInfo->firstValue)){ + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; + + pTrace("%p irate_function() firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey); + break; + } + } + + SET_VAL(pCtx, notNullElems, 1); + + if (notNullElems > 0) { + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + } + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) { + void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + return; + } + + // NOTE: keep the intermediate result into the interResultBuf + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + pRateInfo->firstKey = pRateInfo->lastKey; + pRateInfo->firstValue = pRateInfo->lastValue; + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[index]; + + pTrace("====%p irate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->firstValue , pRateInfo->firstKey); + + SET_VAL(pCtx, 1, 1); + + // set has result flag + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void do_sumrate_merge(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pResInfo->superTableQ); + + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + char * input = GET_INPUT_CHAR(pCtx); + + for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { + SRateInfo *pInput = (SRateInfo *)input; + + pTrace("%p do_sumrate_merge() hasResult:%d input num:%" PRId64 " input sum:%f total num:%" PRId64 " total sum:%f", pCtx, pInput->hasResult, pInput->num, pInput->sum, pRateInfo->num, pRateInfo->sum); + + if (pInput->hasResult != DATA_SET_FLAG) { + continue; + } else if (pInput->num == 0) { + pRateInfo->sum += do_calc_rate(pInput); + pRateInfo->num++; + } else { + pRateInfo->sum += pInput->sum; + pRateInfo->num += pInput->num; + } + pRateInfo->hasResult = DATA_SET_FLAG; + } + + // if the data set hasResult is not set, the result is null + if (DATA_SET_FLAG == pRateInfo->hasResult) { + pResInfo->hasResult = DATA_SET_FLAG; + SET_VAL(pCtx, pRateInfo->num, 1); + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void sumrate_func_merge(SQLFunctionCtx *pCtx) { + pTrace("%p sumrate_func_merge() process ...", pCtx); + do_sumrate_merge(pCtx); +} + +static void sumrate_func_second_merge(SQLFunctionCtx *pCtx) { + pTrace("%p sumrate_func_second_merge() process ...", pCtx); + do_sumrate_merge(pCtx); +} + +static void sumrate_finalizer(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + + pTrace("%p sumrate_finalizer() superTableQ:%d num:%" PRId64 " sum:%f hasResult:%d", pCtx, pResInfo->superTableQ, pRateInfo->num, pRateInfo->sum, pRateInfo->hasResult); + + if (pRateInfo->hasResult != DATA_SET_FLAG) { + setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + return; + } + + if (pRateInfo->num == 0) { + // from meter + *(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo); + } else if (pCtx->functionId == TSDB_FUNC_SUM_RATE || pCtx->functionId == TSDB_FUNC_SUM_IRATE) { + *(double*)pCtx->aOutputBuf = pRateInfo->sum; + } else { + *(double*)pCtx->aOutputBuf = pRateInfo->sum / pRateInfo->num; + } + + pResInfo->numOfRes = 1; + pResInfo->hasResult = DATA_SET_FLAG; + doFinalizer(pCtx); +} + + +///////////////////////////////////////////////////////////////////////////////////////////// + + /* * function compatible list. * tag and ts are not involved in the compatibility check @@ -4359,23 +4837,18 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { * e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last... * */ -int32_t funcCompatDefList[28] = { - /* - * count, sum, avg, min, max, stddev, percentile, apercentile, first, last - */ - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, - - /* - * last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z, tag - */ - 4, -1, -1, 1, 1, 1, 1, 1, 1, -1, 1, - - /* - * colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp - */ - 1, 1, 1, -1, 1, 1, 5}; +int32_t funcCompatDefList[] = { + // count, sum, avg, min, max, stddev, percentile, apercentile, first, last + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + // last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z + 4, -1, -1, 1, 1, 1, 1, 1, 1, -1, + // tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp rate irate + 1, 1, 1, 1, -1, 1, 1, 5, 1, 1, + // sum_rate, sum_irate, avg_rate, avg_irate + 1, 1, 1, 1, +}; -SQLAggFuncElem aAggs[28] = {{ +SQLAggFuncElem aAggs[] = {{ // 0, count function does not invoke the finalize function "count", TSDB_FUNC_COUNT, @@ -4798,4 +5271,94 @@ SQLAggFuncElem aAggs[28] = {{ noop1, copy_function, no_data_info, + }, + { + // 28 + "rate", + TSDB_FUNC_RATE, + TSDB_FUNC_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + rate_finalizer, + rate_func_merge, + rate_func_copy, + data_req_load_info, + }, + { + // 29 + "irate", + TSDB_FUNC_IRATE, + TSDB_FUNC_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + rate_finalizer, + rate_func_merge, + rate_func_copy, + data_req_load_info, + }, + { + // 30 + "sum_rate", + TSDB_FUNC_SUM_RATE, + TSDB_FUNC_SUM_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 31 + "sum_irate", + TSDB_FUNC_SUM_IRATE, + TSDB_FUNC_SUM_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 32 + "avg_rate", + TSDB_FUNC_AVG_RATE, + TSDB_FUNC_AVG_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 33 + "avg_irate", + TSDB_FUNC_AVG_IRATE, + TSDB_FUNC_AVG_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, }}; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 55f2c90fd0f5fd12a7fbc143f5c31ed8a7a1dd51..60c597b38528535a13d6c30142bc212affceb50c 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1122,7 +1122,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel if (addProjectionExprAndResultField(pQueryInfo, pItem) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } - } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_LAST_ROW) { + } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_AVG_IRATE) { // sql function in selection clause, append sql function info in pSqlCmd structure sequentially if (addExprAndResultField(pQueryInfo, outputIndex, pItem) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; @@ -1504,6 +1504,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } case TK_SUM: case TK_AVG: + case TK_RATE: + case TK_IRATE: + case TK_SUM_RATE: + case TK_SUM_IRATE: + case TK_AVG_RATE: + case TK_AVG_IRATE: case TK_TWA: case TK_MIN: case TK_MAX: @@ -1956,6 +1962,24 @@ int32_t changeFunctionID(int32_t optr, int16_t* functionId) { case TK_AVG: *functionId = TSDB_FUNC_AVG; break; + case TK_RATE: + *functionId = TSDB_FUNC_RATE; + break; + case TK_IRATE: + *functionId = TSDB_FUNC_IRATE; + break; + case TK_SUM_RATE: + *functionId = TSDB_FUNC_SUM_RATE; + break; + case TK_SUM_IRATE: + *functionId = TSDB_FUNC_SUM_IRATE; + break; + case TK_AVG_RATE: + *functionId = TSDB_FUNC_AVG_RATE; + break; + case TK_AVG_IRATE: + *functionId = TSDB_FUNC_AVG_IRATE; + break; case TK_MIN: *functionId = TSDB_FUNC_MIN; break; @@ -2149,7 +2173,8 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { int16_t functionId = aAggs[pExpr->functionId].stableFuncId; if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || - (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST)) { + (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) || + (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { if (getResultDataInfo(pField->type, pField->bytes, functionId, pExpr->param[0].i64Key, &type, &bytes, &intermediateBytes, 0, true) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; @@ -2912,7 +2937,7 @@ static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnL pList->ids[pList->num++] = index; } else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) { return TSDB_CODE_INVALID_SQL; - } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_LAST_ROW) { + } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_AVG_IRATE) { return TSDB_CODE_INVALID_SQL; } @@ -2966,8 +2991,8 @@ static bool isValidExpr(tSQLExpr* pLeft, tSQLExpr* pRight, int32_t optr) { * * However, columnA < 4+12 is valid */ - if ((pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_LAST_ROW) || - (pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_LAST_ROW) || + if ((pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_AVG_IRATE) || + (pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_AVG_IRATE) || (pLeft->nSQLOptr >= TK_BOOL && pLeft->nSQLOptr <= TK_BINARY && pRight->nSQLOptr >= TK_BOOL && pRight->nSQLOptr <= TK_BINARY)) { return false; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index af60cc893a233779e14a1eef947199200da3833e..5af7befd873eee67874874ca33c8f85911a6283f 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -684,10 +684,15 @@ typedef struct { } SSecIe; typedef struct { - uint32_t ip; - uint32_t vnode; + int32_t dnode; //the ID of dnode + int32_t vnode; //the index of vnode } SVPeerDesc; +typedef struct { + int32_t numOfVPeers; + SVPeerDesc vpeerDesc[]; +} SVpeerDescArray; + typedef struct { int32_t vnode; SVnodeCfg cfg; diff --git a/src/mnode/inc/mgmtStable.h b/src/mnode/inc/mgmtStable.h new file mode 100644 index 0000000000000000000000000000000000000000..161fd816fe8f9db4c3cbd4d7d875029677f243d0 --- /dev/null +++ b/src/mnode/inc/mgmtStable.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TBASE_MNODE_STABLE_H +#define TBASE_MNODE_STABLE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "mnode.h" + +int32_t mgmtInitSTable(); + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/mnode/src/mgmtStable.c b/src/mnode/src/mgmtStable.c new file mode 100644 index 0000000000000000000000000000000000000000..ba278f5457a65282325dd985a30e19c84cd0d6d2 --- /dev/null +++ b/src/mnode/src/mgmtStable.c @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" + +#include "mnode.h" +#include "mgmtAcct.h" +#include "mgmtGrant.h" +#include "mgmtUtil.h" +#include "mgmtDb.h" +#include "mgmtDnodeInt.h" +#include "mgmtVgroup.h" +#include "mgmtSupertableQuery.h" +#include "mgmtTable.h" +#include "taosmsg.h" +#include "tast.h" +#include "textbuffer.h" +#include "tschemautil.h" +#include "tscompression.h" +#include "tskiplist.h" +#include "tsqlfunction.h" +#include "ttime.h" +#include "tstatus.h" + diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index e7a4aebb9cee00835e8a2f1754415b032a54c4dc..27a5ffd4f962aaef0f6278ebc127db037d2872e3 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -97,7 +97,7 @@ int32_t mgmtMeterDropColumnByName(STabObj *pTable, const char *name); static int dropMeterImp(SDbObj *pDb, STabObj * pTable, SAcctObj *pAcct); static void dropAllMetersOfMetric(SDbObj *pDb, STabObj * pMetric, SAcctObj *pAcct); -void mgmtMeterActionInit() { +static void mgmtMeterActionInit() { mgmtMeterActionFp[SDB_TYPE_INSERT] = mgmtMeterActionInsert; mgmtMeterActionFp[SDB_TYPE_DELETE] = mgmtMeterActionDelete; mgmtMeterActionFp[SDB_TYPE_UPDATE] = mgmtMeterActionUpdate; diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index 2bd800cad7a9fee92b45e2d7e7f56d87bb40afa0..379925a6bf419ca883c26fbc4b332859784a5a22 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -239,8 +239,8 @@ char *taosBuildReqHeader(void *param, char type, char *msg) { pHeader->spi = 0; pHeader->tcp = 0; pHeader->encrypt = 0; - pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); - if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); + pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1); + if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_16(&pConn->tranId, 1); pHeader->sourceId = pConn->ownId; pHeader->destId = pConn->peerId; diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index f12cb7830dc0faf6f1854f63b31bf18828a080ea..d39878cfcbbc08722f57fe5ec6cdd5e4aff422a2 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -56,6 +56,7 @@ ELSEIF (TD_WINDOWS_64) LIST(APPEND SRC ./src/tmempool.c) LIST(APPEND SRC ./src/tmodule.c) LIST(APPEND SRC ./src/tnote.c) + LIST(APPEND SRC ./src/tpercentile.c) LIST(APPEND SRC ./src/tsched.c) LIST(APPEND SRC ./src/tskiplist.c) LIST(APPEND SRC ./src/tsocket.c) @@ -91,6 +92,7 @@ ELSEIF(TD_DARWIN_64) LIST(APPEND SRC ./src/tmempool.c) LIST(APPEND SRC ./src/tmodule.c) LIST(APPEND SRC ./src/tnote.c) + LIST(APPEND SRC ./src/tpercentile.c) LIST(APPEND SRC ./src/tsched.c) LIST(APPEND SRC ./src/tskiplist.c) LIST(APPEND SRC ./src/tsocket.c) diff --git a/src/util/inc/tsqlfunction.h b/src/util/inc/tsqlfunction.h index 93f50cf4f3862e7d2747c10399858c3be0562072..2caecb6309d11065653666e786218ca87e5bcce9 100644 --- a/src/util/inc/tsqlfunction.h +++ b/src/util/inc/tsqlfunction.h @@ -60,6 +60,13 @@ extern "C" { #define TSDB_FUNC_LAST_DST 26 #define TSDB_FUNC_INTERP 27 +#define TSDB_FUNC_RATE 28 +#define TSDB_FUNC_IRATE 29 +#define TSDB_FUNC_SUM_RATE 30 +#define TSDB_FUNC_SUM_IRATE 31 +#define TSDB_FUNC_AVG_RATE 32 +#define TSDB_FUNC_AVG_IRATE 33 + #define TSDB_FUNCSTATE_SO 0x1U // single output #define TSDB_FUNCSTATE_MO 0x2U // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define TSDB_FUNCSTATE_STREAM 0x4U // function avail for stream @@ -287,10 +294,10 @@ typedef struct STwaInfo { } STwaInfo; /* global sql function array */ -extern struct SQLAggFuncElem aAggs[28]; +extern struct SQLAggFuncElem aAggs[]; /* compatible check array list */ -extern int32_t funcCompatDefList[28]; +extern int32_t funcCompatDefList[]; void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max, int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull); diff --git a/src/util/src/ttokenizer.c b/src/util/src/ttokenizer.c index d4f3bd6879dae6b9e8573a9230f39fe3405b5927..7cbb4552b410536176f5e69d9ee9336af197d94f 100644 --- a/src/util/src/ttokenizer.c +++ b/src/util/src/ttokenizer.c @@ -231,6 +231,7 @@ static SKeyword keywordTable[] = { {"RATE", TK_RATE}, {"IRATE", TK_IRATE}, {"SUM_RATE", TK_SUM_RATE}, + {"SUM_IRATE", TK_SUM_IRATE}, {"AVG_RATE", TK_AVG_RATE}, {"AVG_IRATE", TK_AVG_IRATE}, }; diff --git a/src/vnode/common/inc/vnodePeer.h b/src/vnode/common/inc/vnodePeer.h new file mode 100644 index 0000000000000000000000000000000000000000..becf9bae394fb44da3c8c5f58a97983293a98e1f --- /dev/null +++ b/src/vnode/common/inc/vnodePeer.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_PEER_H +#define TDENGINE_VNODEPEER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "tsdb.h" + +/* + * Initialize the resources + */ +int32_t vnodeInitPeer(int numOfThreads); + +/* + * Free the resources + */ +void vnodeCleanUpPeers(); + +/* + * Start a vnode synchronization process + */ +int32_t vnodeOpenPeer(int32_t vnode); + +/* + * Update the peerinfo of vnode + */ +int32_t vnodeConfigPeer(SVpeerDescArray msg); + +/* + * Close a vnode synchronization process + */ +void vnodeCleanUpPeer(int32_t vnode); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_VNODEPEER_H diff --git a/src/vnode/detail/src/vnodePeer.spec.c b/src/vnode/common/src/vnodePeer.c similarity index 100% rename from src/vnode/detail/src/vnodePeer.spec.c rename to src/vnode/common/src/vnodePeer.c diff --git a/src/vnode/detail/inc/vnodePeer.h b/src/vnode/detail/inc/vnodePeer.h deleted file mode 100644 index 4f17e66a70d8ffc0727021a17359e7f58161cff4..0000000000000000000000000000000000000000 --- a/src/vnode/detail/inc/vnodePeer.h +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_VNODEPEER_H -#define TDENGINE_VNODEPEER_H - -#include "os.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define TSDB_VMSG_SYNC_DATA 1 -#define TSDB_VMSG_FORWARD 2 -#define TSDB_VMSG_SYNC_REQ 3 -#define TSDB_VMSG_SYNC_RSP 4 -#define TSDB_VMSG_SYNC_MUST 5 -#define TSDB_VMSG_STATUS 6 - -#pragma pack(push, 1) - -typedef struct { - char type; - char version; - short sourceVid; - short destVid; -} SFirstPkt; - -typedef struct { - uint64_t lastCreate; - uint64_t lastRemove; - uint32_t fileId; - uint64_t fmagic[]; -} SSyncMsg; - -typedef struct { - char status; - uint64_t version; -} SPeerState; - -typedef struct { - char status : 6; - char ack : 2; - char commitInProcess; - int32_t fileId; // ID for corrupted file, 0 means no corrupted file - uint64_t version; - SPeerState peerStates[]; -} SPeerStatus; - -#pragma pack(pop) - -typedef struct _thread_obj { - pthread_t thread; - int threadId; - int pollFd; - int numOfFds; -} SThreadObj; - -typedef struct { - int numOfThreads; - SThreadObj **pThread; - pthread_t thread; - int threadId; -} SThreadPool; - -typedef struct _vnodePeer { - void * signature; - int ownId; // own vnode ID - uint32_t ip; - char ipstr[20]; // ip string - int vid; - int status; - int syncStatus; - int32_t fileId; // 0 means no corrupted file - uint64_t version; - int commitInProcess; - int syncFd; - int peerFd; // forward FD - void * hbTimer; - void * syncTimer; - SThreadObj *pThread; -} SVnodePeer; - -typedef struct { - SVnodePeer *pVPeer; - uint64_t lastCreate; - uint64_t lastRemove; - uint32_t fileId; - uint64_t fmagic[]; -} SSyncCmd; - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_VNODEPEER_H diff --git a/src/vnode/detail/inc/vnodeQueryImpl.h b/src/vnode/detail/inc/vnodeQueryImpl.h index 40b65aa16375a81bb4a46f61967b2576d5af6189..cce66786fd993b1b300a5a2abbe27e5f4eff37de 100644 --- a/src/vnode/detail/inc/vnodeQueryImpl.h +++ b/src/vnode/detail/inc/vnodeQueryImpl.h @@ -126,6 +126,7 @@ static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) { bool isQueryKilled(SQuery* pQuery); bool isFixedOutputQuery(SQuery* pQuery); bool isPointInterpoQuery(SQuery* pQuery); +bool isSumAvgRateQuery(SQuery *pQuery); bool isTopBottomQuery(SQuery* pQuery); bool isFirstLastRowQuery(SQuery* pQuery); bool isTSCompQuery(SQuery* pQuery); diff --git a/src/vnode/detail/src/vnodeQueryImpl.c b/src/vnode/detail/src/vnodeQueryImpl.c index 109ecb3625916bbc54cb5d5c39776cffe12bb021..104e4b859e3009fd3741957b32583162a3f32ceb 100644 --- a/src/vnode/detail/src/vnodeQueryImpl.c +++ b/src/vnode/detail/src/vnodeQueryImpl.c @@ -2438,8 +2438,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes // store the first&last timestamp into the intermediate buffer [1], the true // value may be null but timestamp will never be null pCtx->ptsList = (int64_t *)(primaryColumnData + startOffset * TSDB_KEYSIZE); - } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || - functionId == TSDB_FUNC_DIFF) { + } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || + functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF || + (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { /* * leastsquares function needs two columns of input, currently, the x value of linear equation is set to * timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer @@ -2723,6 +2724,22 @@ bool isPointInterpoQuery(SQuery *pQuery) { } // TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION +bool isSumAvgRateQuery(SQuery *pQuery) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; + if (functionId == TSDB_FUNC_TS) { + continue; + } + + if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE || + functionId == TSDB_FUNC_AVG_RATE || functionId == TSDB_FUNC_AVG_IRATE) { + return true; + } + } + + return false; +} + bool isTopBottomQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; @@ -4584,7 +4601,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 4096, type, pQuery->rowSize, pSupporter->pResult); } - if (pQuery->nAggTimeInterval != 0) { + if (pQuery->nAggTimeInterval != 0 || isSumAvgRateQuery(pQuery)) { // one page for each table at least ret = createResultBuf(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); if (ret != TSDB_CODE_SUCCESS) { diff --git a/src/vnode/detail/src/vnodeQueryProcess.c b/src/vnode/detail/src/vnodeQueryProcess.c index 1b04806f7c4e68f8a6c3f98df82f5941414f700f..c243a78e837cbc0f1ad60d83a7786da3aae54d3a 100644 --- a/src/vnode/detail/src/vnodeQueryProcess.c +++ b/src/vnode/detail/src/vnodeQueryProcess.c @@ -398,7 +398,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - if (pQuery->nAggTimeInterval == 0) { // normal query + if (pQuery->nAggTimeInterval == 0 && !isSumAvgRateQuery(pQuery)) { // normal query if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { qTrace( @@ -964,7 +964,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { return; } - if (pQuery->nAggTimeInterval > 0) { + if (pQuery->nAggTimeInterval > 0 || isSumAvgRateQuery(pQuery)) { assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) {