未验证 提交 b8688f6f 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge branch 'develop' into szhou/feature/string-case-transform-substr-trim

# TDengine文档
TDengine是一个高效的存储、查询、分析时序大数据的平台,专为物联网、车联网、工业互联网、运维监测等优化而设计。您可以像使用关系型数据库MySQL一样来使用它,但建议您在使用前仔细阅读一遍下面的文档,特别是 [数据模型](/architecture)[数据建模](/model)。除本文档之外,欢迎 [下载产品白皮书](https://www.taosdata.com/downloads/TDengine%20White%20Paper.pdf)如需查阅TDengine 1.6 文档,请点击 [这里](https://www.taosdata.com/cn/documentation16/) 访问。
TDengine是一个高效的存储、查询、分析时序大数据的平台,专为物联网、车联网、工业互联网、运维监测等优化而设计。您可以像使用关系型数据库MySQL一样来使用它,但建议您在使用前仔细阅读一遍下面的文档,特别是 [数据模型](/architecture)[数据建模](/model)。除本文档之外,欢迎 [下载产品白皮书](https://www.taosdata.com/downloads/TDengine%20White%20Paper.pdf)
## [TDengine介绍](/evaluation)
......@@ -84,11 +84,10 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台,专
## TDengine 组件与工具
* [taosAdapter 用户手册](/tools/adapter)
* [TDinsight 用户手册](/tools/insight)
* [taosTools 安装手册](/tools/taos-tools)
* [taosdump 用户手册](/tools/taosdump)
* [taosbenchmark 用户手册](/tools/taosbenchmark)
* [taosAdapter](/tools/adapter): TDengine 集群和应用之间的 RESTful 接口适配服务。
* [TDinsight](/tools/insight): 监控 TDengine 集群的 Grafana 面板集合。
* [taosdump](/tools/taosdump): TDengine 数据备份工具。使用 taosdump 请安装 taosTools。
* [taosBenchmark](/tools/taosbenchmark): TDengine 压力测试工具。使用 taosBenchmark 请安装 taosTools。
## [与其他工具的连接](/connections)
......
......@@ -178,7 +178,7 @@ taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s);
## <a class="anchor" id="taosBenchmark"></a> taosBenchmark 详细功能列表
taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,请执行 `taosBenchmark --help` 详细列出。您可以设置不同参数进行体验。
taosBenchmark 详细使用方法请参照 [如何使用taosBenchmark对TDengine进行性能测试](https://www.taosdata.com/cn/documentation/getting-started/taosBenchmark )
taosBenchmark 详细使用方法请参照 [如何使用taosBenchmark对TDengine进行性能测试](https://www.taosdata.com/2021/10/09/3111.html)
## 客户端
......
......@@ -55,18 +55,17 @@ INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('beijing') VALUES(
## <a class="anchor" id="version"></a>TAOS-JDBCDriver 版本以及支持的 TDengine 版本和 JDK 版本
| taos-jdbcdriver 版本 | TDengine 版本 | JDK 版本 |
|--------------------|--------------------| -------- |
| 2.0.36 | 2.4.0 及以上 | 1.8.x |
| 2.0.35 | 2.3.0 及以上 | 1.8.x |
| 2.0.33 - 2.0.34 | 2.0.3.0 及以上 | 1.8.x |
| 2.0.31 - 2.0.32 | 2.1.3.0 及以上 | 1.8.x |
| 2.0.22 - 2.0.30 | 2.0.18.0 - 2.1.2.x | 1.8.x |
| 2.0.12 - 2.0.21 | 2.0.8.0 - 2.0.17.x | 1.8.x |
| 2.0.4 - 2.0.11 | 2.0.0.0 - 2.0.7.x | 1.8.x |
| 1.0.3 | 1.6.1.x 及以上 | 1.8.x |
| 1.0.2 | 1.6.1.x 及以上 | 1.8.x |
| 1.0.1 | 1.6.1.x 及以上 | 1.8.x |
| taos-jdbcdriver 版本 | TDengine 2.0.x.x 版本 | TDengine 2.2.x.x 版本 | TDengine 2.4.x.x 版本 | JDK 版本 |
|---------------------| ----------------------| ----------------------| ----------------------| -------- |
| 2.0.37 | X | X | 2.4.0.4 | 1.8.x |
| 2.0.36 | X | 2.2.2.11 以上 | 2.4.0.0 - 2.4.0.3 | 1.8.x |
| 2.0.35 | X | 2.2.2.11 以上 | 2.3.0.0 - 2.4.0.3 | 1.8.x |
| 2.0.33 - 2.0.34 | 2.0.3.0 以上 | 2.2.0.0 以上 | 2.4.0.0 - 2.4.0.3 | 1.8.x |
| 2.0.31 - 2.0.32 | 2.1.3.0 - 2.1.7.7 | X | X | 1.8.x |
| 2.0.22 - 2.0.30 | 2.0.18.0 - 2.1.2.1 | X | X | 1.8.x |
| 2.0.12 - 2.0.21 | 2.0.8.0 - 2.0.17.4 | X | X | 1.8.x |
| 2.0.4 - 2.0.11 | 2.0.0.0 - 2.0.7.3 | X | X | 1.8.x |
## TDengine DataType 和 Java DataType
......
# TDengine Documentation
TDengine is a highly efficient platform to store, query, and analyze time-series data. It is specially designed and optimized for IoT, Internet of Vehicles, Industrial IoT, IT Infrastructure and Application Monitoring, etc. It works like a relational database, such as MySQL, but you are strongly encouraged to read through the following documentation before you experience it, especially the Data Modeling sections. In addition to this document, you should also download and read the technology white paper. For the older TDengine version 1.6 documentation, please click [here](https://www.taosdata.com/en/documentation16/).
TDengine is a highly efficient platform to store, query, and analyze time-series data. It is specially designed and optimized for IoT, Internet of Vehicles, Industrial IoT, IT Infrastructure and Application Monitoring, etc. It works like a relational database, such as MySQL, but you are strongly encouraged to read through the following documentation before you experience it, especially the Data Modeling sections. In addition to this document, you should also download and read the technology white paper.
## [TDengine Introduction](/evaluation)
* [TDengine Introduction and Features](/evaluation#intro)
......@@ -82,11 +81,10 @@ TDengine is a highly efficient platform to store, query, and analyze time-series
## [Components and Tools](/cn/documentation/)
* [taosAdapter User Manual](/tools/adapter)
* [TDinsight User Manual](/tools/insight)
* [taosTools Install Manual](/tools/taos-tools)
* [taosdump User Manual](/tools/taosdump)
* [taosbenchmark User Manual](/tools/taosbenchmark)
* [taosAdapter](/tools/adapter): a bridge/adapter between TDengine cluster and applications.
* [TDinsight](/tools/insight): monitoring TDengine cluster with Grafana.
* [taosdump](/tools/taosdump): backup tool for TDengine. Please install `taosTools` package for it.
* [taosBenchmark](tools/taosbenchmark): stress test tool for TDengine. Please install `taosTools` package for it.
## [Connections with Other Tools](/connections)
......
......@@ -183,7 +183,7 @@ taos> select avg(f1), max(f2), min(f3) from test.t10 interval(10s);
## <a class="anchor" id="taosBenchmark"></a> Using taosBenchmark in detail
you can run command `taosBenchmark` with many options, like number of tables, rows of records and so on. To know more about these options, you can execute `taosBenchmark --help` and then take a try using different options.
Please refer to [How to use taosBenchmark to test the performance of TDengine](https://www.taosdata.com/en/documentation/getting-started/taosBenchmark) for detail.
Please refer to [How to use taosBenchmark to test the performance of TDengine](https://tdengine.com/2021/10/09/3114.html) for detail.
## Client and Alarm Module
......
......@@ -507,8 +507,8 @@ function install_service_on_systemd() {
${csudo}bash -c "echo '[Unit]' >> ${taosd_service_config}"
${csudo}bash -c "echo 'Description=${productName} server service' >> ${taosd_service_config}"
${csudo}bash -c "echo 'After=network-online.target taosadapter.service' >> ${taosd_service_config}"
${csudo}bash -c "echo 'Wants=network-online.target taosadapter.service' >> ${taosd_service_config}"
${csudo}bash -c "echo 'After=network-online.target' >> ${taosd_service_config}"
${csudo}bash -c "echo 'Wants=network-online.target' >> ${taosd_service_config}"
${csudo}bash -c "echo >> ${taosd_service_config}"
${csudo}bash -c "echo '[Service]' >> ${taosd_service_config}"
${csudo}bash -c "echo 'Type=simple' >> ${taosd_service_config}"
......
......@@ -427,8 +427,8 @@ function install_service_on_systemd() {
${csudo}bash -c "echo '[Unit]' >> ${taosd_service_config}"
${csudo}bash -c "echo 'Description=TDengine server service' >> ${taosd_service_config}"
${csudo}bash -c "echo 'After=network-online.target taosadapter.service' >> ${taosd_service_config}"
${csudo}bash -c "echo 'Wants=network-online.target taosadapter.service' >> ${taosd_service_config}"
${csudo}bash -c "echo 'After=network-online.target' >> ${taosd_service_config}"
${csudo}bash -c "echo 'Wants=network-online.target' >> ${taosd_service_config}"
${csudo}bash -c "echo >> ${taosd_service_config}"
${csudo}bash -c "echo '[Service]' >> ${taosd_service_config}"
${csudo}bash -c "echo 'Type=simple' >> ${taosd_service_config}"
......
......@@ -313,7 +313,12 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
}
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("0x%"PRIx64" async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
if (tsShortcutFlag) {
tscDebug("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
pSql->res.code = TSDB_CODE_SUCCESS;
} else {
tscError("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
}
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){
......
......@@ -280,7 +280,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
// } else {
// pQdesc->stableQuery = 0;
// }
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
for (int32_t i = 0; i < pQdesc->numOfSub; ++i) {
SSqlObj *psub = pSql->pSubs[i];
......@@ -295,6 +295,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
p += len;
}
}
pthread_mutex_unlock(&pSql->subState.mutex);
}
pQdesc->numOfSub = htonl(pQdesc->numOfSub);
......
......@@ -3458,10 +3458,7 @@ static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken
strncpy(tmpTokenBuf, pToken->z, pToken->n);
pToken->z = tmpTokenBuf;
if (pToken->type == TK_ID) {
pToken->n = stringProcess(pToken->z, pToken->n);
}
pToken->n = stringProcess(pToken->z, pToken->n);
for (int16_t i = 0; i < numOfCols; ++i) {
if (pToken->n != strlen(pSchema[i].name)) {
......@@ -3554,7 +3551,10 @@ int32_t getTableIndexImpl(SStrToken* pTableToken, SQueryInfo* pQueryInfo, SColum
int32_t getTableIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
SStrToken tableToken = {0};
extractTableNameFromToken(pToken, &tableToken);
if (pToken->z && (pToken->z[0] != TS_BACKQUOTE_CHAR || pToken->z[pToken->n - 1] != TS_BACKQUOTE_CHAR)) {
extractTableNameFromToken(pToken, &tableToken);
}
if (getTableIndexImpl(&tableToken, pQueryInfo, pIndex) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -6849,7 +6849,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
SStrToken name = {.z = pItem->pVar.pz, .n = pItem->pVar.nLen, .type = TK_STRING};
SStrToken name = {.z = pItem->pVar.pz, .n = pItem->pVar.nLen};
if (getColumnIndexByName(&name, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -6864,7 +6864,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
char name1[128] = {0};
strncpy(name1, pItem->pVar.pz, pItem->pVar.nLen);
stringProcess(name1, (int32_t)strlen(name1));
TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_INT, name1, tDataTypes[TSDB_DATA_TYPE_INT].bytes);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
......@@ -6887,12 +6887,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SColumnIndex srcIndex = COLUMN_INDEX_INITIALIZER;
SColumnIndex destIndex = COLUMN_INDEX_INITIALIZER;
SStrToken srcToken = {.z = pSrcItem->pVar.pz, .n = pSrcItem->pVar.nLen, .type = TK_STRING};
SStrToken srcToken = {.z = pSrcItem->pVar.pz, .n = pSrcItem->pVar.nLen};
if (getColumnIndexByName(&srcToken, pQueryInfo, &srcIndex, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsg, msg17);
}
SStrToken destToken = {.z = pDstItem->pVar.pz, .n = pDstItem->pVar.nLen, .type = TK_STRING};
SStrToken destToken = {.z = pDstItem->pVar.pz, .n = pDstItem->pVar.nLen};
if (getColumnIndexByName(&destToken, pQueryInfo, &destIndex, tscGetErrorMsgPayload(pCmd)) == TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsg, msg19);
}
......@@ -6901,6 +6901,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
char name[TSDB_COL_NAME_LEN] = {0};
strncpy(name, pItem->pVar.pz, pItem->pVar.nLen);
stringProcess(name, (int32_t)strlen(name));
TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_INT, name, tDataTypes[TSDB_DATA_TYPE_INT].bytes);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
......@@ -6908,6 +6909,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
memset(name, 0, tListLen(name));
strncpy(name, pItem->pVar.pz, pItem->pVar.nLen);
stringProcess(name, (int32_t)strlen(name));
f = tscCreateField(TSDB_DATA_TYPE_INT, name, tDataTypes[TSDB_DATA_TYPE_INT].bytes);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
......@@ -6920,7 +6922,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
int16_t numOfTags = tscGetNumOfTags(pTableMeta);
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
SStrToken name = {.type = TK_STRING, .z = item->pVar.pz, .n = item->pVar.nLen};
SStrToken name = {.z = item->pVar.pz, .n = item->pVar.nLen};
if (getColumnIndexByName(&name, pQueryInfo, &columnIndex, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -7053,14 +7055,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
tVariantListItem* pItem = taosArrayGet(pAlterSQL->varList, 0);
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
SStrToken name = {.type = TK_STRING, .z = pItem->pVar.pz, .n = pItem->pVar.nLen};
//handle Escape character backstick
bool inEscape = false;
if (name.z[0] == TS_BACKQUOTE_CHAR && name.z[name.n - 1] == TS_BACKQUOTE_CHAR) {
inEscape = true;
name.type = TK_ID;
}
SStrToken name = {.z = pItem->pVar.pz, .n = pItem->pVar.nLen};
if (getColumnIndexByName(&name, pQueryInfo, &columnIndex, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsg, msg17);
......@@ -7072,12 +7067,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
char name1[TSDB_COL_NAME_LEN] = {0};
tstrncpy(name1, pItem->pVar.pz, sizeof(name1));
int32_t nameLen = pItem->pVar.nLen;
if (inEscape) {
memmove(name1, name1 + 1, nameLen);
name1[nameLen - TS_BACKQUOTE_CHAR_SIZE] = '\0';
}
stringProcess(name1, (int32_t)strlen(name1));
TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_INT, name1, tDataTypes[TSDB_DATA_TYPE_INT].bytes);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
......@@ -7086,21 +7076,13 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidOperationMsg(pMsg, msg16);
}
TAOS_FIELD* pItem = taosArrayGet(pAlterSQL->pAddColumns, 0);
if (pItem->type != TSDB_DATA_TYPE_BINARY && pItem->type != TSDB_DATA_TYPE_NCHAR) {
return invalidOperationMsg(pMsg, msg21);
}
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
SStrToken name = {.type = TK_STRING, .z = pItem->name, .n = (uint32_t)strlen(pItem->name)};
//handle Escape character backstick
bool inEscape = false;
if (name.z[0] == TS_BACKQUOTE_CHAR && name.z[name.n - 1] == TS_BACKQUOTE_CHAR) {
inEscape = true;
name.type = TK_ID;
}
SStrToken name = {.z = pItem->name, .n = (uint32_t)strlen(pItem->name)};
if (getColumnIndexByName(&name, pQueryInfo, &columnIndex, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsg, msg17);
......@@ -7136,12 +7118,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidOperationMsg(pMsg, msg24);
}
if (inEscape) {
memmove(name.z, name.z + 1, name.n);
name.z[name.n - TS_BACKQUOTE_CHAR_SIZE] = '\0';
name.n -= TS_BACKQUOTE_CHAR_SIZE;
}
stringProcess(name.z, name.n);
TAOS_FIELD f = tscCreateField(pColSchema->type, name.z, pItem->bytes);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
}else if (pAlterSQL->type == TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN) {
......@@ -7155,13 +7132,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
SStrToken name = {.type = TK_STRING, .z = pItem->name, .n = (uint32_t)strlen(pItem->name)};
//handle Escape character backstick
if (name.z[0] == TS_BACKQUOTE_CHAR && name.z[name.n - 1] == TS_BACKQUOTE_CHAR) {
memmove(name.z, name.z + 1, name.n);
name.z[name.n - TS_BACKQUOTE_CHAR_SIZE] = '\0';
name.n -= TS_BACKQUOTE_CHAR_SIZE;
}
SStrToken name = {.z = pItem->name, .n = (uint32_t)strlen(pItem->name)};
if (getColumnIndexByName(&name, pQueryInfo, &columnIndex, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsg, msg17);
}
......@@ -7202,6 +7173,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidOperationMsg(pMsg, msg24);
}
stringProcess(name.z, name.n);
TAOS_FIELD f = tscCreateField(pColSchema->type, name.z, pItem->bytes);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
}
......@@ -7555,9 +7528,8 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlN
*/
if (pQueryInfo->limit.limit > 0) {
pQueryInfo->vgroupLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset;
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.limit += pQueryInfo->limit.offset;
}
pQueryInfo->limit.offset = 0;
}
} else {
......
......@@ -332,7 +332,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = NULL,
.code = 0
};
if ((rpcMsg.msgType == TSDB_MSG_TYPE_SUBMIT) && (tsShortcutFlag & TSDB_SHORTCUT_RB_RPC_SEND_SUBMIT)) {
rpcFreeCont(rpcMsg.pCont);
return TSDB_CODE_FAILED;
}
rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
return TSDB_CODE_SUCCESS;
}
......
......@@ -3167,6 +3167,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
const int32_t table_index = 0;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pPQueryInfo = tscGetQueryInfo(pCmd); // Parent SQueryInfo
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
......@@ -3176,8 +3178,14 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
// clear the limit/offset info, since it should not be sent to vnode to be executed.
pQueryInfo->limit.limit = -1;
if (pQueryInfo->limit.offset > 0 && pQueryInfo->limit.limit > 0) {
pQueryInfo->limit.limit += pQueryInfo->limit.offset;
}
pQueryInfo->limit.offset = 0;
// if groupby must retrieve all subquery data
if(pPQueryInfo->groupbyColumn || pPQueryInfo->groupbyTag) {
pQueryInfo->limit.limit = -1;
}
assert(trsupport->subqueryIndex < pSql->subState.numOfSub);
......
......@@ -4518,6 +4518,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
pRes->final = finalBk;
pRes->numOfTotal = num;
pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
taos_free_result(pSql->pSubs[i]);
}
......@@ -4525,6 +4526,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
tfree(pSql->pSubs);
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
pSql->fp = fp;
......
......@@ -64,6 +64,7 @@ extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults;
extern char tsTempDir[];
extern int32_t tsShortcutFlag;
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
......
......@@ -65,6 +65,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
int8_t tsEnableCoreFile = 0;
int32_t tsMaxBinaryDisplayWidth = 30;
int32_t tsShortcutFlag = 0; // shortcut flag to facilitate debugging
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
......@@ -1749,6 +1750,17 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_MB;
taosInitConfigOption(cfg);
// shortcut flag to facilitate debugging
cfg.option = "shortcutFlag";
cfg.ptr = &tsShortcutFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = (1 << 24);
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ
// lossy compress
cfg.option = "lossyColumns";
......
{
"name": "node-red-contrib-tdengine",
"version": "0.0.2",
"version": "0.0.3",
"description": "",
"main": "tdengine.js",
"repository": {
......@@ -10,8 +10,7 @@
"author": "kevinpan45@163.com",
"license": "ISC",
"dependencies": {
"axios": "^0.24.0",
"mocha": "^9.1.3"
"axios": "^0.24.0"
},
"node-red": {
"nodes": {
......@@ -23,7 +22,6 @@
"tdengine"
],
"devDependencies": {
"node-red": "^2.1.4",
"node-red-node-test-helper": "^0.2.7"
"node-red": "^2.1.4"
}
}
......@@ -402,6 +402,11 @@ do { \
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_SHORTCUT_RB_RPC_SEND_SUBMIT 0x01u // RB: return before(global shortcut)
#define TSDB_SHORTCUT_RA_RPC_RECV_SUBMIT 0x02u // RA: return after(global shortcut)
#define TSDB_SHORTCUT_NR_VNODE_WAL_WRITE 0x04u // NR: no return and go on following actions(local shortcut)
#define TSDB_SHORTCUT_RB_TSDB_COMMIT 0x08u
#define TSDB_PORT_DNODESHELL 0
#define TSDB_PORT_DNODEDNODE 5
#define TSDB_PORT_SYNC 10
......
......@@ -432,6 +432,16 @@ void* getJsonTagValueElment(void* data, char* key, int32_t keyLen, char* out, in
void getJsonTagValueAll(void* data, void* dst, int16_t bytes);
char* parseTagDatatoJson(void *p);
//
// scan callback
//
// type define
#define READ_TABLE 1
#define READ_QUERY 2
typedef bool (*readover_callback)(void* param, int8_t type, int32_t tid);
void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param);
#ifdef __cplusplus
}
#endif
......
Subproject commit 18916a1719fdfcefe1ed1d4ce0049f36c3ac4796
Subproject commit d6baa48620fcbff857642c4ec10e3c48226ca97c
......@@ -325,6 +325,8 @@ typedef struct SQueryRuntimeEnv {
SHashObj *pTableRetrieveTsMap;
SUdfInfo *pUdfInfo;
bool udfIsCopy;
SHashObj *pTablesRead; // record child tables already read rows by tid hash
int32_t cntTableReadOver; // read table over count
} SQueryRuntimeEnv;
enum {
......@@ -721,4 +723,10 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
int32_t getColumnDataFromId(void *param, int32_t id, void **data);
void qInfoLogSSDataBlock(SSDataBlock* block, char* location);
// add table read rows count. pHashTables must not be NULL
void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows);
// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv*
bool qReadOverCB(void* param, int8_t type, int32_t tid);
#endif // TDENGINE_QEXECUTOR_H
......@@ -859,7 +859,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
X.n += Z.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -869,10 +869,10 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, false);
toTSDBType(Z.type);
A = tVariantListAppendToken(A, &Z, -1, true);
A = tVariantListAppendToken(A, &Z, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -882,7 +882,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, false);
A = tVariantListAppend(A, &Z, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL, -1);
......@@ -906,7 +906,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
X.n += F.n;
toTSDBType(A.type);
SArray* K = tVariantListAppendToken(NULL, &A, -1, true);
SArray* K = tVariantListAppendToken(NULL, &A, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -928,7 +928,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
X.n += Z.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -938,10 +938,10 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, false);
toTSDBType(Z.type);
A = tVariantListAppendToken(A, &Z, -1, true);
A = tVariantListAppendToken(A, &Z, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -951,7 +951,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, false);
A = tVariantListAppend(A, &Z, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL, TSDB_SUPER_TABLE);
......
......@@ -2057,6 +2057,22 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize);
pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen);
// malloc pTablesRead value if super table && project query and && has order by && limit is true
if( pRuntimeEnv->pQueryHandle && // client merge no tsdb query, so pQueryHandle is NULL, except client merge case in here
pQueryAttr->limit.limit > 0 &&
pQueryAttr->limit.offset == 0 && // if have offset, ignore limit optimization
pQueryAttr->stableQuery &&
isProjQuery(pQueryAttr) &&
pQueryAttr->order.orderColId != -1 ) {
// can be optimizate limit
pRuntimeEnv->pTablesRead = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pRuntimeEnv->pTablesRead) // must malloc ok, set callback to tsdb
tsdbAddScanCallback(pRuntimeEnv->pQueryHandle, qReadOverCB, pRuntimeEnv);
} else {
pRuntimeEnv->pTablesRead = NULL;
}
pRuntimeEnv->cntTableReadOver= 0;
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pRuntimeEnv->pTableRetrieveTsMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
......@@ -5990,6 +6006,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
// record table read rows
addTableReadRows(pRuntimeEnv, pBlock->info.tid, pBlock->info.rows);
pProjectInfo->existDataBlock = NULL;
*newgroup = true;
......@@ -6035,6 +6054,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
doSetOperatorCompleted(pOperator);
break;
}
// record table read rows
addTableReadRows(pRuntimeEnv, pBlock->info.tid, pBlock->info.rows);
// Return result of the previous group in the firstly.
if (*newgroup) {
......@@ -10019,3 +10041,69 @@ void freeQueryAttr(SQueryAttr* pQueryAttr) {
filterFreeInfo(pQueryAttr->pFilters);
}
}
// add table read rows count. pHashTables must not be NULL
void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows) {
SHashObj* pHashObj = pEnv->pTablesRead;
int32_t limit = (int32_t)pEnv->pQueryAttr->limit.limit;
if (pHashObj == NULL) {
return ;
}
// read old value
int32_t v = 0;
int32_t* pv = (int32_t* )taosHashGet(pHashObj, &tid, sizeof(int32_t));
if (pv && *pv > 0) {
v = *pv;
}
bool over = v >= limit;
// add new and save
v += rows;
taosHashPut(pHashObj, &tid, sizeof(int32_t), &rows, sizeof(int32_t));
// update read table over cnt
if (!over && v >= limit) {
pEnv->cntTableReadOver += 1;
}
}
// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv*
bool qReadOverCB(void* param, int8_t type, int32_t tid) {
SQueryRuntimeEnv* pEnv = (SQueryRuntimeEnv* )param;
if (pEnv->pTablesRead == NULL) {
return false;
}
// check query is over
if (pEnv->cntTableReadOver >= pEnv->pQueryAttr->tableGroupInfo.numOfTables) {
return true;
}
// if type is read_query can return
if (type == READ_QUERY) {
return false;
}
// read tid value
int32_t* pv = (int32_t* )taosHashGet(pEnv->pTablesRead, &tid, sizeof(int32_t));
if (pv == NULL) {
return false;
}
// compare
if (pEnv->pQueryAttr->limit.limit > 0 && *pv >= pEnv->pQueryAttr->limit.limit ) {
return true; // need data is read ok
}
return false;
}
// check query read is over, retur true over. param is SQueryRuntimeEnv*
bool queryReadOverCB(void* param) {
SQueryRuntimeEnv* pEnv = (SQueryRuntimeEnv* )param;
if (pEnv->cntTableReadOver >= pEnv->pQueryAttr->tableGroupInfo.numOfTables) {
return true;
}
return false;
}
\ No newline at end of file
......@@ -3840,7 +3840,7 @@ static YYACTIONTYPE yy_reduce(
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
toTSDBType(yymsp[0].minor.yy0.type);
SArray* A = tVariantListAppendToken(NULL, &yymsp[0].minor.yy0, -1, true);
SArray* A = tVariantListAppendToken(NULL, &yymsp[0].minor.yy0, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -3851,10 +3851,10 @@ static YYACTIONTYPE yy_reduce(
yymsp[-5].minor.yy0.n += yymsp[-4].minor.yy0.n;
toTSDBType(yymsp[-1].minor.yy0.type);
SArray* A = tVariantListAppendToken(NULL, &yymsp[-1].minor.yy0, -1, true);
SArray* A = tVariantListAppendToken(NULL, &yymsp[-1].minor.yy0, -1, false);
toTSDBType(yymsp[0].minor.yy0.type);
A = tVariantListAppendToken(A, &yymsp[0].minor.yy0, -1, true);
A = tVariantListAppendToken(A, &yymsp[0].minor.yy0, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -3865,7 +3865,7 @@ static YYACTIONTYPE yy_reduce(
yymsp[-6].minor.yy0.n += yymsp[-5].minor.yy0.n;
toTSDBType(yymsp[-2].minor.yy0.type);
SArray* A = tVariantListAppendToken(NULL, &yymsp[-2].minor.yy0, -1, true);
SArray* A = tVariantListAppendToken(NULL, &yymsp[-2].minor.yy0, -1, false);
A = tVariantListAppend(A, &yymsp[0].minor.yy162, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-6].minor.yy0, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL, -1);
......@@ -3891,7 +3891,7 @@ static YYACTIONTYPE yy_reduce(
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
toTSDBType(yymsp[0].minor.yy0.type);
SArray* K = tVariantListAppendToken(NULL, &yymsp[0].minor.yy0, -1, true);
SArray* K = tVariantListAppendToken(NULL, &yymsp[0].minor.yy0, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -3916,7 +3916,7 @@ static YYACTIONTYPE yy_reduce(
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
toTSDBType(yymsp[0].minor.yy0.type);
SArray* A = tVariantListAppendToken(NULL, &yymsp[0].minor.yy0, -1, true);
SArray* A = tVariantListAppendToken(NULL, &yymsp[0].minor.yy0, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -3927,10 +3927,10 @@ static YYACTIONTYPE yy_reduce(
yymsp[-5].minor.yy0.n += yymsp[-4].minor.yy0.n;
toTSDBType(yymsp[-1].minor.yy0.type);
SArray* A = tVariantListAppendToken(NULL, &yymsp[-1].minor.yy0, -1, true);
SArray* A = tVariantListAppendToken(NULL, &yymsp[-1].minor.yy0, -1, false);
toTSDBType(yymsp[0].minor.yy0.type);
A = tVariantListAppendToken(A, &yymsp[0].minor.yy0, -1, true);
A = tVariantListAppendToken(A, &yymsp[0].minor.yy0, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -3941,7 +3941,7 @@ static YYACTIONTYPE yy_reduce(
yymsp[-6].minor.yy0.n += yymsp[-5].minor.yy0.n;
toTSDBType(yymsp[-2].minor.yy0.type);
SArray* A = tVariantListAppendToken(NULL, &yymsp[-2].minor.yy0, -1, true);
SArray* A = tVariantListAppendToken(NULL, &yymsp[-2].minor.yy0, -1, false);
A = tVariantListAppend(A, &yymsp[0].minor.yy162, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-6].minor.yy0, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL, TSDB_SUPER_TABLE);
......
......@@ -1165,6 +1165,19 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests
switch (rpcMsg.msgType) {
case TSDB_MSG_TYPE_SUBMIT:
if (tsShortcutFlag & TSDB_SHORTCUT_RA_RPC_RECV_SUBMIT) {
SRpcMsg rMsg = {.handle = rpcMsg.handle, .pCont = NULL, .contLen = 0};
rpcSendResponse(&rMsg);
rpcFreeCont(rpcMsg.pCont);
return;
}
break;
default:
break;
}
// notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL);
} else {
......
......@@ -186,9 +186,11 @@ static void *taosRecvUdpData(void *param) {
SUdpConn *pConn = param;
struct sockaddr_in sourceAdd;
ssize_t dataLen;
int32_t msgLen;
unsigned int addLen;
uint16_t port;
SRecvInfo recvInfo;
SRpcHead *pHead;
memset(&sourceAdd, 0, sizeof(sourceAdd));
addLen = sizeof(sourceAdd);
......@@ -218,6 +220,13 @@ static void *taosRecvUdpData(void *param) {
continue;
}
pHead = (SRpcHead *)msg;
msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
if (dataLen < msgLen) {
tError("%s recvfrom failed(%s): dataLen: %ld, msgLen: %d", pConn->label, strerror(errno), (long)dataLen, msgLen);
continue;
}
int32_t size = dataLen + tsRpcOverhead;
char *tmsg = malloc(size);
if (NULL == tmsg) {
......
......@@ -98,6 +98,11 @@ void *tsdbCommitData(STsdbRepo *pRepo) {
}
tsdbStartCommit(pRepo);
if (tsShortcutFlag & TSDB_SHORTCUT_RB_TSDB_COMMIT) {
tsdbEndCommit(pRepo, terrno);
return NULL;
}
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d error occurs while committing META data since %s", REPO_ID(pRepo), tstrerror(terrno));
......
......@@ -39,6 +39,9 @@
.tid = (_checkInfo)->tableId.tid, \
.uid = (_checkInfo)->tableId.uid})
#define IS_END_BLOCK(cur, numOfBlocks, ascTrav) \
((cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav))
// limit offset start optimization for rows read over this value
#define OFFSET_SKIP_THRESHOLD 5000
......@@ -153,6 +156,10 @@ typedef struct STsdbQueryHandle {
SArray *prev; // previous row which is before than time window
SArray *next; // next row which is after the query time window
SIOCostSummary cost;
// callback
readover_callback readover_cb;
void* param;
} STsdbQueryHandle;
typedef struct STableGroupSupporter {
......@@ -182,6 +189,7 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData);
static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(TsdbQueryHandleT pHandle);
static int32_t tsdbQueryTableList(STable* pTable, SArray* pRes, void* filterInfo);
static STableBlockInfo* moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1;
......@@ -2560,26 +2568,25 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists);
static int32_t getDataBlockRv(STsdbQueryHandle* pQueryHandle, STableBlockInfo* pNext, bool *exists) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
while(1) {
while(pNext) {
int32_t code = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
// load error or have data, return
if (code != TSDB_CODE_SUCCESS || *exists) {
return code;
}
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data, continue to find next block util have data
if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
return getFirstFileDataBlock(pQueryHandle, exists);
} else { // next block of the same file
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
pNext = moveToNextDataBlockInCurrentFile(pQueryHandle);
}
}
return TSDB_CODE_SUCCESS; // pNext == NULL no other blocks to move to
}
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists) {
......@@ -2594,6 +2601,15 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
STimeWindow win = TSWINDOW_INITIALIZER;
// check query scan data is over for limit query
if (pQueryHandle->readover_cb && pQueryHandle->readover_cb(pQueryHandle->param, READ_QUERY, -1)) {
// query scan data is over , no need read more
cur->fid = INT32_MIN;
*exists = false;
tsdbInfo("%p LIMIT_READ query is over and stop read. tables=%d qId=0x%"PRIx64, pQueryHandle, numOfTables, pQueryHandle->qId);
return TSDB_CODE_SUCCESS;
}
while (true) {
tsdbRLockFS(REPO_FS(pQueryHandle->pTsdb));
......@@ -2670,20 +2686,52 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
return getDataBlockRv(pQueryHandle, pBlockInfo, exists);
}
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
assert(cur != NULL && numOfBlocks > 0);
return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}
static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
static STableBlockInfo* moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SQueryFilePos* cur = &pQueryHandle->cur;
if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
return NULL;
}
assert(cur->slot < pQueryHandle->numOfBlocks && cur->slot >= 0);
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
// no callback check
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
if(pQueryHandle->readover_cb == NULL) {
return pBlockInfo;
}
// have callback check
int32_t tid = -1;
bool over = false;
do {
// tid changed, re-get over of tid status
if(tid != pBlockInfo->pTableCheckInfo->tableId.tid) {
tid = pBlockInfo->pTableCheckInfo->tableId.tid;
over = pQueryHandle->readover_cb(pQueryHandle->param, READ_TABLE, pBlockInfo->pTableCheckInfo->tableId.tid);
if (!over) // this tid not over
return pBlockInfo;
}
//
// this tid is over, skip all blocks of this tid in following
//
// check end
if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order)))
return NULL;
// move next
cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
} while(1);
return NULL;
}
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo) {
......@@ -2816,12 +2864,15 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
// current block is empty, try next block in file
// all data blocks in current file has been checked already, try next file if exists
if (isEndFileDataBlock(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (IS_END_BLOCK(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
return getFirstFileDataBlock(pQueryHandle, exists);
} else {
moveToNextDataBlockInCurrentFile(pQueryHandle);
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return getDataBlockRv(pQueryHandle, pNext, exists);
// get next block in currentfile. return NULL if no block in current file
STableBlockInfo* pNext = moveToNextDataBlockInCurrentFile(pQueryHandle);
if (pNext == NULL) // file end
return getFirstFileDataBlock(pQueryHandle, exists);
else
return getDataBlockRv(pQueryHandle, pNext, exists);
}
}
}
......@@ -4599,4 +4650,12 @@ int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) {
return pQueryHandle->srows;
}
return 0;
}
// add scan table need callback
void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
pQueryHandle->readover_cb = callback;
pQueryHandle->param = param;
return ;
}
\ No newline at end of file
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#define TSDB_CFG_MAX_NUM 131
#define TSDB_CFG_MAX_NUM 132
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
......
......@@ -103,7 +103,9 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
}
// write into WAL
code = walWrite(pVnode->wal, pHead);
if (!(tsShortcutFlag & TSDB_SHORTCUT_NR_VNODE_WAL_WRITE)) {
code = walWrite(pVnode->wal, pHead);
}
if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
......
此差异已折叠。
......@@ -287,4 +287,4 @@ class TDTestCase:
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
tdCases.addLinux(__file__, TDTestCase())
......@@ -11,6 +11,7 @@
# -*- coding: utf-8 -*-
import os
import time
from util.log import *
from util.cases import *
from util.sql import *
......@@ -32,6 +33,7 @@ class TDTestCase:
cmd = "taosBenchmark -f ./5-taos-tools/taosbenchmark/json/sml_telnet_tcp.json"
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
time.sleep(5)
tdSql.execute("reset query cache")
tdSql.query("select count(tbname) from opentsdb_telnet.stb1")
tdSql.checkData(0, 0, 8)
......
......@@ -14,3 +14,4 @@ python3 ./test.py -f 2-query/upper_func.py
python3 ./test.py -f 2-query/ltrim_func.py
python3 ./test.py -f 2-query/rtrim_func.py
python3 ./test.py -f 2-query/substr_func.py
python3 ./test.py -f 2-query/math_funcs.py
# -*- coding: utf-8 -*-
import random
import string
import subprocess
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdLog.debug("check databases")
tdSql.prepare()
### test normal table
tdSql.execute("create database if not exists db")
tdSql.execute("use db")
tdSql.execute("create stable `sch.job.create` (`ts` TIMESTAMP, `tint` int, `node.value` NCHAR(7)) TAGS (`endpoint` NCHAR(7),`task.type` NCHAR(3))")
tdSql.execute("alter table `sch.job.create` modify tag `task.type` NCHAR(4)")
tdSql.execute("alter table `sch.job.create` change tag `task.type` `chan.type`")
tdSql.execute("alter table `sch.job.create` drop tag `chan.type`")
tdSql.execute("alter table `sch.job.create` add tag `add.type` NCHAR(6)")
tdSql.query("describe `sch.job.create`")
tdSql.checkData(4, 0, "add.type")
tdSql.execute("alter table `sch.job.create` modify column `node.value` NCHAR(8)")
tdSql.execute("alter table `sch.job.create` drop column `node.value`")
tdSql.execute("alter table `sch.job.create` add column `add.value` NCHAR(6)")
tdSql.query("describe `sch.job.create`")
tdSql.checkData(2, 0, "add.value")
tdSql.execute("insert into `tsch.job.create` using `sch.job.create`(`add.type`) TAGS('tag1') values(now, 1, 'here')")
tdSql.execute("alter table `tsch.job.create` set tag `add.type` = 'tag2'")
tdSql.query("select `add.type` from `tsch.job.create`")
tdSql.checkData(0, 0, "tag2")
### test stable
tdSql.execute("create stable `ssch.job.create` (`ts` TIMESTAMP, `tint` int, `node.value` NCHAR(7)) TAGS (`endpoint` NCHAR(7),`task.type` NCHAR(3))")
tdSql.execute("alter stable `ssch.job.create` modify tag `task.type` NCHAR(4)")
tdSql.execute("alter stable `ssch.job.create` change tag `task.type` `chan.type`")
tdSql.execute("alter stable `ssch.job.create` drop tag `chan.type`")
tdSql.execute("alter stable `ssch.job.create` add tag `add.type` NCHAR(6)")
tdSql.query("describe `ssch.job.create`")
tdSql.checkData(4, 0, "add.type")
tdSql.execute("alter stable `ssch.job.create` modify column `node.value` NCHAR(8)")
tdSql.execute("alter stable `ssch.job.create` drop column `node.value`")
tdSql.execute("alter stable `ssch.job.create` add column `add.value` NCHAR(6)")
tdSql.query("describe `ssch.job.create`")
tdSql.checkData(2, 0, "add.value")
tdSql.execute("insert into `tssch.job.create` using `ssch.job.create`(`add.type`) TAGS('tag1') values(now, 1, 'here')")
tdSql.error("alter stable `tssch.job.create` set tag `add.type` = 'tag2'")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
......@@ -417,6 +417,7 @@ python3 ./test.py -f insert/flushwhiledrop.py
python3 ./test.py -f alter/alterColMultiTimes.py
python3 ./test.py -f query/queryWildcardLength.py
python3 ./test.py -f query/queryTbnameUpperLower.py
python3 ./test.py -f alter/alterBackQuoteCol.py
python3 ./test.py -f query/query.py
python3 ./test.py -f query/queryDiffColsTagsAndOr.py
......
......@@ -130,10 +130,17 @@ class TDTestCase:
if 'httpDbNameMandatory' not in rj:
tdLog.info('has no httpDbNameMandatory shown')
tdLog.exit(1)
if rj['httpDbNameMandatory'] != '1':
val = None
pname = 'taosadapter' #httpDbNameMandatory doesn't work in taosadapter
cmd = 'ps -ef|grep %s|grep -v "grep"' % pname
p = subprocess.Popen(cmd,shell=True, stdout=subprocess.PIPE)
if p.wait() == 0:
val = p.stdout.read()
if rj['httpDbNameMandatory'] != '1' and pname not in str(val):
tdLog.info('httpDbNameMandatory data:%s == expect:0'%rj['httpDbNameMandatory'])
tdLog.exit(1)
tdLog.info("httpDbNameMandatory by restful query data:%s == expect:1" % (rj['httpDbNameMandatory']))
if pname not in str(val):
tdLog.info("httpDbNameMandatory by restful query data:%s == expect:1" % (rj['httpDbNameMandatory']))
def run(self):
......
......@@ -56,6 +56,13 @@ class TDTestCase:
self.test_case2()
tdLog.debug(" LIMIT test_case2 ............ [OK]")
# insert data
self.insert_data("t2", self.ts, 100*10000, 30000);
self.insert_data("t3", self.ts, 200*10000, 30000);
# test supper table
self.test_limit()
tdLog.debug(" LIMIT test super table ............ [OK]")
# stop
def stop(self):
......@@ -186,6 +193,31 @@ class TDTestCase:
tdSql.waitedQuery(sql, 3, WAITS)
tdSql.checkData(0, 1, 1)
# test limit
def test_limit(self):
#
# base test
#
# offset
sql = "select * from st order by ts limit 20"
tdSql.waitedQuery(sql, 20, WAITS)
tdSql.checkData(19, 1, 6)
sql = "select * from st order by ts desc limit 20"
tdSql.waitedQuery(sql, 20, WAITS)
tdSql.checkData(19, 1, 2999980)
sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts limit 16;"
tdSql.waitedQuery(sql, 16, WAITS)
tdSql.checkData(15, 1, 15)
sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16;"
tdSql.waitedQuery(sql, 16, WAITS)
tdSql.checkData(15, 1, 720004)
sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16;"
tdSql.waitedQuery(sql, 16, WAITS)
tdSql.checkData(15, 1, 720004)
sql = "select * from st where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' order by ts desc limit 16 offset 3;"
tdSql.waitedQuery(sql, 16, WAITS)
tdSql.checkData(15, 1, 720003)
#
# add case with filename
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册