未验证 提交 64c06c95 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #5683 from taosdata/develop

Merge from develop into master
...@@ -4,7 +4,7 @@ PROJECT(TDengine) ...@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "2.0.18.0") SET(TD_VER_NUMBER "2.0.19.0")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
...@@ -101,7 +101,7 @@ $ taos -h 192.168.0.1 -s "use db; show tables;" ...@@ -101,7 +101,7 @@ $ taos -h 192.168.0.1 -s "use db; show tables;"
### 运行SQL命令脚本 ### 运行SQL命令脚本
TDengine终端可以通过`source`命令来运行SQL命令脚本. TDengine 终端可以通过 `source` 命令来运行 SQL 命令脚本.
```mysql ```mysql
taos> source <filename>; taos> source <filename>;
...@@ -109,10 +109,10 @@ taos> source <filename>; ...@@ -109,10 +109,10 @@ taos> source <filename>;
### Shell小技巧 ### Shell小技巧
- 可以使用上下光标键查看已经历史输入的命 - 可以使用上下光标键查看历史输入的指
- 修改用户密码。在shell中使用alter user命 - 修改用户密码。在 shell 中使用 alter user 指
- ctrl+c 中止正在进行中的查询 - ctrl+c 中止正在进行中的查询
- 执行`RESET QUERY CACHE`清空本地缓存的表的schema - 执行 `RESET QUERY CACHE` 清空本地缓存的表 schema
## <a class="anchor" id="demo"></a>TDengine 极速体验 ## <a class="anchor" id="demo"></a>TDengine 极速体验
...@@ -212,7 +212,7 @@ taos> select avg(f1), max(f2), min(f3) from test.t10 interval(10s); ...@@ -212,7 +212,7 @@ taos> select avg(f1), max(f2), min(f3) from test.t10 interval(10s);
| **Python** | ● | ● | ● | ○ | ● | ● | ● | -- | ● | | **Python** | ● | ● | ● | ○ | ● | ● | ● | -- | ● |
| **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- | | **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- | | **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- |
| **C#** | ○ | ● | ● | ○ | ○ | ○ | ○ | -- | -- | | **C#** | ● | ● | ○ | ○ | ○ | ○ | ○ | -- | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● | ● | | **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● | ● |
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。 注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
......
...@@ -14,7 +14,7 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、Java、 ...@@ -14,7 +14,7 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、Java、
| **Python** | ● | ● | ● | ○ | ● | ● | ○ | -- | ○ | | **Python** | ● | ● | ● | ○ | ● | ● | ○ | -- | ○ |
| **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- | | **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- | | **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- |
| **C#** | ○ | ● | ● | ○ | ○ | ○ | ○ | -- | -- | | **C#** | ● | ● | ○ | ○ | ○ | ○ | ○ | -- | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ○ | ○ | ○ | | **RESTful** | ● | ● | ● | ● | ● | ● | ○ | ○ | ○ |
其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。 其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。
...@@ -23,7 +23,7 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、Java、 ...@@ -23,7 +23,7 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、Java、
* 在没有安装TDengine服务端软件的系统中使用连接器(除RESTful外)访问 TDengine 数据库,需要安装相应版本的客户端安装包来使应用驱动(Linux系统中文件名为libtaos.so,Windows系统中为taos.dll)被安装在系统中,否则会产生无法找到相应库文件的错误。 * 在没有安装TDengine服务端软件的系统中使用连接器(除RESTful外)访问 TDengine 数据库,需要安装相应版本的客户端安装包来使应用驱动(Linux系统中文件名为libtaos.so,Windows系统中为taos.dll)被安装在系统中,否则会产生无法找到相应库文件的错误。
* 所有执行 SQL 语句的 API,例如 C/C++ Connector 中的 `tao_query``taos_query_a``taos_subscribe` 等,以及其它语言中与它们对应的API,每次都只能执行一条 SQL 语句,如果实际参数中包含了多条语句,它们的行为是未定义的。 * 所有执行 SQL 语句的 API,例如 C/C++ Connector 中的 `tao_query``taos_query_a``taos_subscribe` 等,以及其它语言中与它们对应的API,每次都只能执行一条 SQL 语句,如果实际参数中包含了多条语句,它们的行为是未定义的。
* 升级到TDengine到2.0.8.0版本的用户,必须更新JDBC连接TDengine必须升级taos-jdbcdriver到2.0.12及以上。 * 升级到TDengine到2.0.8.0版本的用户,必须更新JDBC连接TDengine必须升级taos-jdbcdriver到2.0.12及以上。详细的版本依赖关系请参见 [taos-jdbcdriver 文档](https://www.taosdata.com/cn/documentation/connector/java#version)
* 无论选用何种编程语言的连接器,2.0 及以上版本的 TDengine 推荐数据库应用的每个线程都建立一个独立的连接,或基于线程建立连接池,以避免连接内的“USE statement”状态量在线程之间相互干扰(但连接的查询和写入操作都是线程安全的)。 * 无论选用何种编程语言的连接器,2.0 及以上版本的 TDengine 推荐数据库应用的每个线程都建立一个独立的连接,或基于线程建立连接池,以避免连接内的“USE statement”状态量在线程之间相互干扰(但连接的查询和写入操作都是线程安全的)。
## <a class="anchor" id="driver"></a>安装连接器驱动步骤 ## <a class="anchor" id="driver"></a>安装连接器驱动步骤
......
name: tdengine name: tdengine
base: core18 base: core18
version: '2.0.18.0' version: '2.0.19.0'
icon: snap/gui/t-dengine.svg icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT. summary: an open-source big data platform designed and optimized for IoT.
description: | description: |
...@@ -72,7 +72,7 @@ parts: ...@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd - usr/bin/taosd
- usr/bin/taos - usr/bin/taos
- usr/bin/taosdemo - usr/bin/taosdemo
- usr/lib/libtaos.so.2.0.18.0 - usr/lib/libtaos.so.2.0.19.0
- usr/lib/libtaos.so.1 - usr/lib/libtaos.so.1
- usr/lib/libtaos.so - usr/lib/libtaos.so
......
...@@ -36,19 +36,6 @@ extern "C" { ...@@ -36,19 +36,6 @@ extern "C" {
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\ #define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
typedef struct SParsedColElem {
int16_t colIndex;
uint16_t offset;
} SParsedColElem;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfAssignedCols;
SParsedColElem elems[TSDB_MAX_COLUMNS];
bool hasVal[TSDB_MAX_COLUMNS];
} SParsedDataColInfo;
#pragma pack(push,1) #pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid // this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL, // an 'uid' whose low bytes is 0xff being recoginized as NULL,
...@@ -118,6 +105,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -118,6 +105,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset); uint32_t offset);
...@@ -140,6 +129,8 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); ...@@ -140,6 +129,8 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo); bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
......
...@@ -175,6 +175,19 @@ typedef struct SParamInfo { ...@@ -175,6 +175,19 @@ typedef struct SParamInfo {
uint32_t offset; uint32_t offset;
} SParamInfo; } SParamInfo;
typedef struct SBoundColumn {
bool hasVal; // denote if current column has bound or not
int32_t offset; // all column offset value
} SBoundColumn;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfBound;
int32_t *boundedColumns;
SBoundColumn *cols;
} SParsedDataColInfo;
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
SName tableName; SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client int8_t tsSource; // where does the UNIX timestamp come from, server or client
...@@ -189,6 +202,8 @@ typedef struct STableDataBlocks { ...@@ -189,6 +202,8 @@ typedef struct STableDataBlocks {
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData; char *pData;
SParsedDataColInfo boundColumnInfo;
// for parameter ('?') binding // for parameter ('?') binding
uint32_t numOfAllocedParams; uint32_t numOfAllocedParams;
uint32_t numOfParams; uint32_t numOfParams;
...@@ -425,6 +440,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo); ...@@ -425,6 +440,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void destroyTableNameList(SSqlCmd* pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
...@@ -462,6 +478,7 @@ char* tscGetSqlStr(SSqlObj* pSql); ...@@ -462,6 +478,7 @@ char* tscGetSqlStr(SSqlObj* pSql);
bool tscIsQueryWithLimit(SSqlObj* pSql); bool tscIsQueryWithLimit(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd); char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
......
...@@ -338,11 +338,20 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -338,11 +338,20 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pReducer->resColModel->capacity = pReducer->nResultBufSize; pReducer->resColModel->capacity = pReducer->nResultBufSize;
pReducer->finalModel = pFFModel; pReducer->finalModel = pFFModel;
int32_t expandFactor = 1;
if (finalmodel->rowSize > 0) { if (finalmodel->rowSize > 0) {
pReducer->resColModel->capacity /= finalmodel->rowSize; bool topBotQuery = tscIsTopbotQuery(pQueryInfo);
if (topBotQuery) {
expandFactor = tscGetTopbotQueryParam(pQueryInfo);
pReducer->resColModel->capacity /= (finalmodel->rowSize * expandFactor);
pReducer->resColModel->capacity *= expandFactor;
} else {
pReducer->resColModel->capacity /= finalmodel->rowSize;
}
} }
assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize); assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize);
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL || if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
...@@ -1150,9 +1159,10 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo ...@@ -1150,9 +1159,10 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
memset(buf, 0, (size_t)maxBufSize); memset(buf, 0, (size_t)maxBufSize);
memcpy(buf, pCtx->pOutput, (size_t)pCtx->outputBytes); memcpy(buf, pCtx->pOutput, (size_t)pCtx->outputBytes);
char* next = pCtx->pOutput;
for (int32_t i = 0; i < inc; ++i) { for (int32_t i = 0; i < inc; ++i) {
pCtx->pOutput += pCtx->outputBytes; next += pCtx->outputBytes;
memcpy(pCtx->pOutput, buf, (size_t)pCtx->outputBytes); memcpy(next, buf, (size_t)pCtx->outputBytes);
} }
} }
...@@ -1440,6 +1450,11 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1440,6 +1450,11 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tFilePage *tmpBuffer = pLocalMerge->pTempBuffer; tFilePage *tmpBuffer = pLocalMerge->pTempBuffer;
int32_t remain = 1;
if (tscIsTopbotQuery(pQueryInfo)) {
remain = tscGetTopbotQueryParam(pQueryInfo);
}
if (doHandleLastRemainData(pSql)) { if (doHandleLastRemainData(pSql)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1528,7 +1543,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1528,7 +1543,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* if the previous group does NOT generate any result (pResBuf->num == 0), * if the previous group does NOT generate any result (pResBuf->num == 0),
* continue to process results instead of return results. * continue to process results instead of return results.
*/ */
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalMerge->resColModel->capacity)) { if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num + remain >= pLocalMerge->resColModel->capacity)) {
// does not belong to the same group // does not belong to the same group
bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup); bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup);
......
此差异已折叠。
...@@ -271,6 +271,41 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { ...@@ -271,6 +271,41 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
return false; return false;
} }
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr == NULL) {
continue;
}
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
return true;
}
}
return false;
}
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr == NULL) {
continue;
}
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
return (int32_t) pExpr->param[0].i64;
}
}
return 0;
}
void tscClearInterpInfo(SQueryInfo* pQueryInfo) { void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
if (!tscIsPointInterpQuery(pQueryInfo)) { if (!tscIsPointInterpQuery(pQueryInfo)) {
return; return;
...@@ -415,6 +450,20 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { ...@@ -415,6 +450,20 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
tfree(pCmd->pQueryInfo); tfree(pCmd->pQueryInfo);
} }
void destroyTableNameList(SSqlCmd* pCmd) {
if (pCmd->numOfTables == 0) {
assert(pCmd->pTableNameList == NULL);
return;
}
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
tfree(pCmd->pTableNameList[i]);
}
pCmd->numOfTables = 0;
tfree(pCmd->pTableNameList);
}
void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd->command = 0; pCmd->command = 0;
pCmd->numOfCols = 0; pCmd->numOfCols = 0;
...@@ -424,14 +473,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { ...@@ -424,14 +473,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd->parseFinished = 0; pCmd->parseFinished = 0;
pCmd->autoCreated = 0; pCmd->autoCreated = 0;
for(int32_t i = 0; i < pCmd->numOfTables; ++i) { destroyTableNameList(pCmd);
if (pCmd->pTableNameList && pCmd->pTableNameList[i]) {
tfree(pCmd->pTableNameList[i]);
}
}
pCmd->numOfTables = 0;
tfree(pCmd->pTableNameList);
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta); pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
...@@ -548,6 +590,11 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -548,6 +590,11 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql); free(pSql);
} }
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->boundedColumns);
tfree(pColInfo->cols);
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
...@@ -568,6 +615,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { ...@@ -568,6 +615,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
tfree(pDataBlock); tfree(pDataBlock);
} }
...@@ -678,7 +726,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -678,7 +726,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* @param dataBlocks * @param dataBlocks
* @return * @return
*/ */
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks) { STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) { if (dataBuf == NULL) {
...@@ -686,10 +734,12 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -686,10 +734,12 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
dataBuf->nAllocSize = (uint32_t)initialSize; dataBuf->nAllocSize = (uint32_t)defaultSize;
dataBuf->headerSize = startOffset; // the header size will always be the startOffset value, reserved for the subumit block header dataBuf->headerSize = startOffset;
// the header size will always be the startOffset value, reserved for the subumit block header
if (dataBuf->nAllocSize <= dataBuf->headerSize) { if (dataBuf->nAllocSize <= dataBuf->headerSize) {
dataBuf->nAllocSize = dataBuf->headerSize*2; dataBuf->nAllocSize = dataBuf->headerSize * 2;
} }
dataBuf->pData = calloc(1, dataBuf->nAllocSize); dataBuf->pData = calloc(1, dataBuf->nAllocSize);
...@@ -699,25 +749,31 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -699,25 +749,31 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
dataBuf->ordered = true; //Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->prevTS = INT64_MIN; dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = tscGetTableSchema(dataBuf->pTableMeta);
tscSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
dataBuf->rowSize = rowSize; dataBuf->ordered = true;
dataBuf->size = startOffset; dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
dataBuf->tsSource = -1; dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name); tNameAssign(&dataBuf->tableName, name);
//Here we keep the tableMeta to avoid it to be remove by other threads. assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf; *dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) { SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
SArray* pBlockList) {
*dataBlocks = NULL; *dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) { if (t1 != NULL) {
...@@ -826,6 +882,8 @@ static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) { ...@@ -826,6 +882,8 @@ static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) {
int32_t i = 0; int32_t i = 0;
while(p1) { while(p1) {
STableDataBlocks* pBlocks = *p1; STableDataBlocks* pBlocks = *p1;
tfree(pCmd->pTableNameList[i]);
pCmd->pTableNameList[i++] = tNameDup(&pBlocks->tableName); pCmd->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pCmd->pTableBlockHashList, p1); p1 = taosHashIterate(pCmd->pTableBlockHashList, p1);
} }
...@@ -942,7 +1000,7 @@ bool tscIsInsertData(char* sqlstr) { ...@@ -942,7 +1000,7 @@ bool tscIsInsertData(char* sqlstr) {
int32_t index = 0; int32_t index = 0;
do { do {
SStrToken t0 = tStrGetToken(sqlstr, &index, false, 0, NULL); SStrToken t0 = tStrGetToken(sqlstr, &index, false);
if (t0.type != TK_LP) { if (t0.type != TK_LP) {
return t0.type == TK_INSERT || t0.type == TK_IMPORT; return t0.type == TK_INSERT || t0.type == TK_IMPORT;
} }
......
...@@ -294,7 +294,7 @@ void cqStop(void *handle) { ...@@ -294,7 +294,7 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) { void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start) {
if (tsEnableStream == 0) { if (tsEnableStream == 0) {
return NULL; return NULL;
} }
...@@ -326,7 +326,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch ...@@ -326,7 +326,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
pObj->rid = taosAddRef(cqObjRef, pObj); pObj->rid = taosAddRef(cqObjRef, pObj);
cqCreateStream(pContext, pObj); if(start && pContext->master) {
cqCreateStream(pContext, pObj);
} else {
pObj->pContext = pContext;
}
rid = pObj->rid; rid = pObj->rid;
......
...@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) { ...@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) { for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema, 1);
} }
tdFreeSchema(pSchema); tdFreeSchema(pSchema);
......
...@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else { } else {
if (qtype == TAOS_QTYPE_FWD) { if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); vnodeConfirmForward(pVnode, pWrite->pHead.version, pWrite->code, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
} }
if (pWrite->rspRet.rsp) { if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp); rpcFreeCont(pWrite->rspRet.rsp);
......
...@@ -42,7 +42,7 @@ void cqStart(void *handle); ...@@ -42,7 +42,7 @@ void cqStart(void *handle);
void cqStop(void *handle); void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ // cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema); void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle); void cqDrop(void *handle);
......
...@@ -51,7 +51,7 @@ typedef struct { ...@@ -51,7 +51,7 @@ typedef struct {
void *cqH; void *cqH;
int (*notifyStatus)(void *, int status, int eno); int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema); void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start);
void (*cqDropFunc)(void *handle); void (*cqDropFunc)(void *handle);
} STsdbAppH; } STsdbAppH;
......
此差异已折叠。
此差异已折叠。
...@@ -628,6 +628,11 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { ...@@ -628,6 +628,11 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
bnNotify(); bnNotify();
} }
if (!tsEnableBalance) {
int32_t numOfMnodes = mnodeGetMnodesNum();
if (numOfMnodes < tsNumOfMnodes) bnNotify();
}
if (openVnodes != pDnode->openVnodes) { if (openVnodes != pDnode->openVnodes) {
mnodeCheckUnCreatedVgroup(pDnode, pStatus->load, openVnodes); mnodeCheckUnCreatedVgroup(pDnode, pStatus->load, openVnodes);
} }
......
...@@ -381,6 +381,8 @@ static bool mnodeAllOnline() { ...@@ -381,6 +381,8 @@ static bool mnodeAllOnline() {
void *pIter = NULL; void *pIter = NULL;
bool allOnline = true; bool allOnline = true;
sdbUpdateMnodeRoles();
while (1) { while (1) {
SMnodeObj *pMnode = NULL; SMnodeObj *pMnode = NULL;
pIter = mnodeGetNextMnode(pIter, &pMnode); pIter = mnodeGetNextMnode(pIter, &pMnode);
......
...@@ -86,7 +86,8 @@ typedef struct SResultRow { ...@@ -86,7 +86,8 @@ typedef struct SResultRow {
bool closed; // this result status: closed or opened bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window uint32_t numOfRows; // number of rows of current time window
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current result row STimeWindow win;
char* key; // start key of current result row
} SResultRow; } SResultRow;
typedef struct SGroupResInfo { typedef struct SGroupResInfo {
......
...@@ -1876,14 +1876,15 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1876,14 +1876,15 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosHashCleanup(pRuntimeEnv->pResultRowHashTable); taosHashCleanup(pRuntimeEnv->pResultRowHashTable);
pRuntimeEnv->pResultRowHashTable = NULL; pRuntimeEnv->pResultRowHashTable = NULL;
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap); taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL; pRuntimeEnv->pTableRetrieveTsMap = NULL;
destroyOperatorInfo(pRuntimeEnv->proot); destroyOperatorInfo(pRuntimeEnv->proot);
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
} }
static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) { static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
...@@ -2630,6 +2631,21 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -2630,6 +2631,21 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis);
if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) { if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) {
{ // set previous window
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
}
bool load = false; bool load = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pTableScanInfo->pCtx[i].functionId; int32_t functionId = pTableScanInfo->pCtx[i].functionId;
...@@ -6463,6 +6479,9 @@ void freeQInfo(SQInfo *pQInfo) { ...@@ -6463,6 +6479,9 @@ void freeQInfo(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables); releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
...@@ -6498,7 +6517,6 @@ void freeQInfo(SQInfo *pQInfo) { ...@@ -6498,7 +6517,6 @@ void freeQInfo(SQInfo *pQInfo) {
} }
} }
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
tfree(pQInfo->pBuf); tfree(pQInfo->pBuf);
tfree(pQInfo->sql); tfree(pQInfo->sql);
......
...@@ -560,7 +560,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenId) { ...@@ -560,7 +560,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenId) {
return 0; return 0;
} }
SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgnoreToken, uint32_t* ignoreTokenTypes) { SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
SStrToken t0 = {0}; SStrToken t0 = {0};
// here we reach the end of sql string, null-terminated string // here we reach the end of sql string, null-terminated string
...@@ -585,7 +585,10 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn ...@@ -585,7 +585,10 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
} }
t0.n = tSQLGetToken(&str[*i], &t0.type); t0.n = tSQLGetToken(&str[*i], &t0.type);
break;
// not support user specfied ignored symbol list
#if 0
bool ignore = false; bool ignore = false;
for (uint32_t k = 0; k < numOfIgnoreToken; k++) { for (uint32_t k = 0; k < numOfIgnoreToken; k++) {
if (t0.type == ignoreTokenTypes[k]) { if (t0.type == ignoreTokenTypes[k]) {
...@@ -597,6 +600,7 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn ...@@ -597,6 +600,7 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
if (!ignore) { if (!ignore) {
break; break;
} }
#endif
} }
if (t0.type == TK_SEMI) { if (t0.type == TK_SEMI) {
......
...@@ -66,8 +66,8 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { ...@@ -66,8 +66,8 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
return; return;
} }
if (pResultRowInfo->type == TSDB_DATA_TYPE_BINARY || pResultRowInfo->type == TSDB_DATA_TYPE_NCHAR) { for(int32_t i = 0; i < pResultRowInfo->size; ++i) {
for(int32_t i = 0; i < pResultRowInfo->size; ++i) { if (pResultRowInfo->pResult[i]) {
tfree(pResultRowInfo->pResult[i]->key); tfree(pResultRowInfo->pResult[i]->key);
} }
} }
...@@ -153,11 +153,8 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 ...@@ -153,11 +153,8 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
pResultRow->offset = -1; pResultRow->offset = -1;
pResultRow->closed = false; pResultRow->closed = false;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { tfree(pResultRow->key);
tfree(pResultRow->key); pResultRow->win = TSWINDOW_INITIALIZER;
} else {
pResultRow->win = TSWINDOW_INITIALIZER;
}
} }
// TODO refactor: use macro // TODO refactor: use macro
......
...@@ -10,7 +10,7 @@ namespace { ...@@ -10,7 +10,7 @@ namespace {
// simple test // simple test
void simpleTest() { void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, NULL); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1);
int32_t pageId = 0; int32_t pageId = 0;
int32_t groupId = 0; int32_t groupId = 0;
...@@ -52,7 +52,7 @@ void simpleTest() { ...@@ -52,7 +52,7 @@ void simpleTest() {
void writeDownTest() { void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0; int32_t pageId = 0;
int32_t writePageId = 0; int32_t writePageId = 0;
...@@ -99,7 +99,7 @@ void writeDownTest() { ...@@ -99,7 +99,7 @@ void writeDownTest() {
void recyclePageTest() { void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0; int32_t pageId = 0;
int32_t writePageId = 0; int32_t writePageId = 0;
......
...@@ -35,7 +35,7 @@ extern "C" { ...@@ -35,7 +35,7 @@ extern "C" {
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_MAX_FWDS 512 #define SYNC_MAX_FWDS 1024
#define SYNC_FWD_TIMER 300 #define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 15000 // ms #define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_CHECK_INTERVAL 1000 // ms #define SYNC_CHECK_INTERVAL 1000 // ms
......
...@@ -1372,7 +1372,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId) { ...@@ -1372,7 +1372,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId) {
if (/*pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && */ nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue; if (/*pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && */ nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue;
if (/*pPeer->sstatus > TAOS_SYNC_STATUS_INIT || */ nodeSStatus > TAOS_SYNC_STATUS_INIT) continue; if (/*pPeer->sstatus > TAOS_SYNC_STATUS_INIT || */ nodeSStatus > TAOS_SYNC_STATUS_INIT) continue;
sDebug("%s, check roles since self:%s sstatus:%s, peer:%s sstatus:%s", pPeer->id, syncRole[pPeer->role], sDebug("%s, check roles since peer:%s sstatus:%s, self:%s sstatus:%s", pPeer->id, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], syncRole[nodeRole], syncStatus[nodeSStatus]); syncStatus[pPeer->sstatus], syncRole[nodeRole], syncStatus[nodeSStatus]);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId()); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId());
break; break;
...@@ -1459,7 +1459,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1459,7 +1459,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if ((pNode->quorum > 1 || force) && code == 0) { if ((pNode->quorum > 1 || force) && code == 0) {
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle); code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
if (code >= 0) code = 1; if (code >= 0) {
code = 1;
} else {
pthread_mutex_unlock(&pNode->mutex);
return code;
}
} }
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen); int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
......
...@@ -526,7 +526,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) { ...@@ -526,7 +526,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) { if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql, pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1)); tsdbGetTableSchemaImpl(pTable, false, false, -1), 0);
} }
} }
} }
...@@ -619,4 +619,4 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -619,4 +619,4 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
return 0; return 0;
} }
\ No newline at end of file
...@@ -840,7 +840,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo ...@@ -840,7 +840,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1; if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) { if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql, pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1)); tsdbGetTableSchemaImpl(pTable, false, false, -1), 1);
} }
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
...@@ -1322,4 +1322,4 @@ static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema) { ...@@ -1322,4 +1322,4 @@ static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema) {
} }
return 0; return 0;
} }
\ No newline at end of file
...@@ -51,11 +51,9 @@ uint32_t tSQLGetToken(char *z, uint32_t *tokenType); ...@@ -51,11 +51,9 @@ uint32_t tSQLGetToken(char *z, uint32_t *tokenType);
* @param str * @param str
* @param i * @param i
* @param isPrevOptr * @param isPrevOptr
* @param numOfIgnoreToken
* @param ignoreTokenTypes
* @return * @return
*/ */
SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr, uint32_t numOfIgnoreToken, uint32_t *ignoreTokenTypes); SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
/** /**
* check if it is a keyword or not * check if it is a keyword or not
......
...@@ -91,13 +91,17 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -91,13 +91,17 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
int32_t syncCode = 0; int32_t syncCode = 0;
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force); syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
if (syncCode < 0) return syncCode; if (syncCode < 0) {
pHead->version = 0;
return syncCode;
}
// write into WAL // write into WAL
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) { if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1); if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code); vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
pHead->version = 0;
return code; return code;
} }
......
...@@ -178,7 +178,7 @@ python3 ./test.py -f stable/query_after_reset.py ...@@ -178,7 +178,7 @@ python3 ./test.py -f stable/query_after_reset.py
# perfbenchmark # perfbenchmark
python3 ./test.py -f perfbenchmark/bug3433.py python3 ./test.py -f perfbenchmark/bug3433.py
python3 ./test.py -f perfbenchmark/bug3589.py #python3 ./test.py -f perfbenchmark/bug3589.py
#query #query
......
...@@ -11,13 +11,13 @@ ...@@ -11,13 +11,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys
import taos import taos
from util.log import tdLog from util.log import tdLog
from util.cases import tdCases from util.cases import tdCases
from util.sql import tdSql from util.sql import tdSql
from util.dnodes import tdDnodes from util.dnodes import tdDnodes
from multiprocessing import Process from multiprocessing import Process
import subprocess
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
...@@ -40,27 +40,22 @@ class TDTestCase: ...@@ -40,27 +40,22 @@ class TDTestCase:
print("alter table done") print("alter table done")
def deleteTableAndRecreate(self): def deleteTableAndRecreate(self):
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = tdDnodes.getSimCfgPath() self.config = tdDnodes.getSimCfgPath()
self.conn = taos.connect(host = self.host, user = self.user, password = self.password, config = self.config) sqlCmds = "use test; drop table stb;"
self.cursor = self.conn.cursor() sqlCmds += "create table if not exists stb (ts timestamp, col1 int) tags(areaid int, city nchar(20));"
self.cursor.execute("use test")
print("drop table stb")
self.cursor.execute("drop table stb")
print("create table stb")
self.cursor.execute("create table if not exists stb (ts timestamp, col1 int) tags(areaid int, city nchar(20))")
print("insert data")
for i in range(self.tables): for i in range(self.tables):
city = "beijing" if i % 2 == 0 else "shanghai" city = "beijing" if i % 2 == 0 else "shanghai"
self.cursor.execute("create table tb%d using stb tags(%d, '%s')" % (i, i, city)) sqlCmds += "create table tb%d using stb tags(%d, '%s');" % (i, i, city)
for j in range(self.rows): for j in range(5):
self.cursor.execute("insert into tb%d values(%d, %d)" % (i, self.ts + j, j * 100000)) sqlCmds += "insert into tb%d values(%d, %d);" % (i, self.ts + j, j * 100000)
command = ["taos", "-c", self.config, "-s", sqlCmds]
print("drop stb, recreate stb and insert data ")
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
if result.returncode == 0:
print("success:", result)
else:
print("error:", result)
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
...@@ -100,19 +95,17 @@ class TDTestCase: ...@@ -100,19 +95,17 @@ class TDTestCase:
tdSql.query("select count(*) from stb") tdSql.query("select count(*) from stb")
tdSql.checkData(0, 0, 10000) tdSql.checkData(0, 0, 10000)
tdSql.query("select count(*) from tb1") tdSql.query("select count(*) from tb0")
tdSql.checkData(0, 0, 1000) tdSql.checkData(0, 0, 1000)
p = Process(target=self.deleteTableAndRecreate, args=()) # drop stable in subprocess
p.start() self.deleteTableAndRecreate()
p.join()
p.terminate()
tdSql.query("select count(*) from stb") tdSql.query("select count(*) from stb")
tdSql.checkData(0, 0, 10000) tdSql.checkData(0, 0, 5 * self.tables)
tdSql.query("select count(*) from tb1") tdSql.query("select count(*) from tb0")
tdSql.checkData(0, 0, 1000) tdSql.checkData(0, 0, 5)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 0,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"}, {"type": "DOUBLE", "count":10}, {"type": "BINARY", "len": 16, "count":3}, {"type": "BINARY", "len": 32, "count":6}],
"tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":5}]
}]
}]
}
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "no",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb",
"child_table_exists":"yes",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 1000,
"childtable_limit": 33,
"childtable_offset": 33,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"}, {"type": "DOUBLE", "count":10}, {"type": "BINARY", "len": 16, "count":3}, {"type": "BINARY", "len": 32, "count":6}],
"tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":5}]
}]
}]
}
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"databases": [{ "databases": [{
"dbinfo": { "dbinfo": {
"name": "db", "name": "db",
"drop": "yes", "drop": "no",
"replica": 1, "replica": 1,
"days": 10, "days": 10,
"cache": 16, "cache": 16,
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
}, },
"super_tables": [{ "super_tables": [{
"name": "stb", "name": "stb",
"child_table_exists":"no", "child_table_exists":"yes",
"childtable_count": 100, "childtable_count": 100,
"childtable_prefix": "stb_", "childtable_prefix": "stb_",
"auto_create_table": "no", "auto_create_table": "no",
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
"databases": [{ "databases": [{
"dbinfo": { "dbinfo": {
"name": "db", "name": "db",
"drop": "yes", "drop": "no",
"replica": 1, "replica": 1,
"days": 10, "days": 10,
"cache": 16, "cache": 16,
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
}, },
"super_tables": [{ "super_tables": [{
"name": "stb", "name": "stb",
"child_table_exists":"no", "child_table_exists":"yes",
"childtable_count": 100, "childtable_count": 100,
"childtable_prefix": "stb_", "childtable_prefix": "stb_",
"auto_create_table": "no", "auto_create_table": "no",
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
"name": "stb", "name": "stb",
"child_table_exists":"no", "child_table_exists":"no",
"childtable_count": 20, "childtable_count": 20,
"childtable_limit": 10,
"childtable_offset": 0,
"childtable_prefix": "t_", "childtable_prefix": "t_",
"auto_create_table": "no", "auto_create_table": "no",
"data_source": "sample", "data_source": "sample",
......
...@@ -51,7 +51,8 @@ class TDTestCase: ...@@ -51,7 +51,8 @@ class TDTestCase:
else: else:
tdLog.info("taosd found in %s" % buildPath) tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/" binPath = buildPath+ "/build/bin/"
os.system("%staosdemo -f tools/insert-tblimit-tboffset.json" % binPath) os.system("%staosdemo -f tools/insert-tblimit-tboffset-createdb.json" % binPath)
os.system("%staosdemo -f tools/insert-tblimit-tboffset-insertrec.json" % binPath)
tdSql.execute("use db") tdSql.execute("use db")
tdSql.query("select count(tbname) from db.stb") tdSql.query("select count(tbname) from db.stb")
...@@ -59,6 +60,7 @@ class TDTestCase: ...@@ -59,6 +60,7 @@ class TDTestCase:
tdSql.query("select count(*) from db.stb") tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 33000) tdSql.checkData(0, 0, 33000)
os.system("%staosdemo -f tools/insert-tblimit-tboffset-createdb.json" % binPath)
os.system("%staosdemo -f tools/insert-tblimit-tboffset0.json" % binPath) os.system("%staosdemo -f tools/insert-tblimit-tboffset0.json" % binPath)
tdSql.execute("reset query cache") tdSql.execute("reset query cache")
...@@ -68,6 +70,7 @@ class TDTestCase: ...@@ -68,6 +70,7 @@ class TDTestCase:
tdSql.query("select count(*) from db.stb") tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 20000) tdSql.checkData(0, 0, 20000)
os.system("%staosdemo -f tools/insert-tblimit-tboffset-createdb.json" % binPath)
os.system("%staosdemo -f tools/insert-tblimit1-tboffset.json" % binPath) os.system("%staosdemo -f tools/insert-tblimit1-tboffset.json" % binPath)
tdSql.execute("reset query cache") tdSql.execute("reset query cache")
......
...@@ -57,7 +57,7 @@ class TDTestCase: ...@@ -57,7 +57,7 @@ class TDTestCase:
tdSql.query("select count(tbname) from db.stb") tdSql.query("select count(tbname) from db.stb")
tdSql.checkData(0, 0, 20) tdSql.checkData(0, 0, 20)
tdSql.query("select count(*) from db.stb") tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 200) tdSql.checkData(0, 0, 400)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -204,7 +204,13 @@ if $data03 != NULL then ...@@ -204,7 +204,13 @@ if $data03 != NULL then
return -1 return -1
endi endi
sql reset query cache print ============================>TD-3366 TD-3486
sql insert into td_3366(ts, c3, c1) using mt(t1) tags(911) values('2018-1-1 11:11:11', 'new1', 12);
sql insert into td_3486(ts, c3, c1) using mt(t1) tags(-12) values('2018-1-1 11:11:11', 'new1', 12);
sql insert into ttxu(ts, c3, c1) using mt(t1) tags('-121') values('2018-1-1 11:11:11', 'new1', 12);
sql insert into tb(ts, c1, c3) using mt(t1) tags(123) values('2018-11-01 16:29:58.000', 2, 'port')
sql insert into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3) sql insert into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3)
sql import into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3) sql import into tb values ('2018-11-01 16:29:58.000', 2, 'import', 3)
sql import into tb values ('2018-11-01 16:39:58.000', 2, 'import', 3) sql import into tb values ('2018-11-01 16:39:58.000', 2, 'import', 3)
...@@ -212,6 +218,7 @@ sql select * from tb order by ts desc ...@@ -212,6 +218,7 @@ sql select * from tb order by ts desc
if $rows != 4 then if $rows != 4 then
return -1 return -1
endi endi
if $data03 != 3 then if $data03 != 3 then
return -1 return -1
endi endi
...@@ -233,10 +240,10 @@ sql_error alter table mt add column c1 int ...@@ -233,10 +240,10 @@ sql_error alter table mt add column c1 int
# drop non-existing columns # drop non-existing columns
sql_error alter table mt drop column c9 sql_error alter table mt drop column c9
sql drop database $db #sql drop database $db
sql show databases #sql show databases
if $rows != 0 then #if $rows != 0 then
return -1 # return -1
endi #endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
#!/bin/bash
Cur_Dir=$(pwd)
echo $Cur_Dir
echo "'2020-1-1 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql
...@@ -8,32 +8,28 @@ sql connect ...@@ -8,32 +8,28 @@ sql connect
sleep 500 sleep 500
sql drop database if exists indb sql drop database if exists indb
sql create database if not exists indb sql create database if not exists indb
sql use indb sql use indb
$inFileName = '~/data.csv' $inFileName = '~/data.csv'
$numOfRows = 10000 $numOfRows = 10000
#system sh/gendata.sh $inFileName $numOfRows # input file invalid system general/parser/gendata.sh
system sh/gendata.sh ~/data.csv $numOfRows
sql create table tbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16), imsi BINARY(16), imei BINARY(16), mdn BINARY(10), net_type BINARY(4), mno NCHAR(4), province NCHAR(10), city NCHAR(16), alarm BINARY(2)) sql create table tbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16), imsi BINARY(16), imei BINARY(16), mdn BINARY(10), net_type BINARY(4), mno NCHAR(4), province NCHAR(10), city NCHAR(16), alarm BINARY(2))
print ====== create tables success, starting import data print ====== create tables success, starting import data
sql import into tbx file $inFileName sql import into tbx file '~/data.sql'
sql select count(*) from tbx sql select count(*) from tbx
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != $numOfRows then
print "expect: $numOfRows, act: $data00"
return -1
endi
#if $data00 != $numOfRows then
# print "expect: $numOfRows, act: $data00"
# return -1
#endi
#system rm -f $inFileName # invalid shell system rm -f ~/data.sql
system rm -f ~/data.csv
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
...@@ -159,6 +159,15 @@ if $data03 != @abc15@ then ...@@ -159,6 +159,15 @@ if $data03 != @abc15@ then
return -1 return -1
endi endi
sql select top(c6, 3) from select_tags_mt0 interval(10a)
sql select top(c3,10) from select_tags_mt0 interval(10a) group by tbname
sql select top(c6, 3) from select_tags_mt0 interval(10a) group by tbname;
sql select top(c6, 10) from select_tags_mt0 interval(10a);
if $rows != 12800 then
return -1
endi
sql select top(c1, 100), tbname, t1, t2 from select_tags_mt0; sql select top(c1, 100), tbname, t1, t2 from select_tags_mt0;
if $rows != 100 then if $rows != 100 then
return -1 return -1
......
...@@ -149,6 +149,7 @@ extern int32_t simScriptSucced; ...@@ -149,6 +149,7 @@ extern int32_t simScriptSucced;
extern int32_t simDebugFlag; extern int32_t simDebugFlag;
extern char tsScriptDir[]; extern char tsScriptDir[];
extern bool simAsyncQuery; extern bool simAsyncQuery;
extern bool abortExecution;
SScript *simParseScript(char *fileName); SScript *simParseScript(char *fileName);
SScript *simProcessCallOver(SScript *script); SScript *simProcessCallOver(SScript *script);
......
...@@ -645,8 +645,12 @@ bool simCreateRestFulConnect(SScript *script, char *user, char *pass) { ...@@ -645,8 +645,12 @@ bool simCreateRestFulConnect(SScript *script, char *user, char *pass) {
bool simCreateNativeConnect(SScript *script, char *user, char *pass) { bool simCreateNativeConnect(SScript *script, char *user, char *pass) {
simCloseTaosdConnect(script); simCloseTaosdConnect(script);
void *taos = NULL; void *taos = NULL;
taosMsleep(2000);
for (int32_t attempt = 0; attempt < 10; ++attempt) { for (int32_t attempt = 0; attempt < 10; ++attempt) {
if (abortExecution) {
script->killed = true;
return false;
}
taos = taos_connect(NULL, user, pass, NULL, tsDnodeShellPort); taos = taos_connect(NULL, user, pass, NULL, tsDnodeShellPort);
if (taos == NULL) { if (taos == NULL) {
simDebug("script:%s, user:%s connect taosd failed:%s, attempt:%d", script->fileName, user, taos_errstr(NULL), simDebug("script:%s, user:%s connect taosd failed:%s, attempt:%d", script->fileName, user, taos_errstr(NULL),
...@@ -697,6 +701,11 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { ...@@ -697,6 +701,11 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
TAOS_RES *pSql = NULL; TAOS_RES *pSql = NULL;
for (int32_t attempt = 0; attempt < 10; ++attempt) { for (int32_t attempt = 0; attempt < 10; ++attempt) {
if (abortExecution) {
script->killed = true;
return false;
}
simLogSql(rest, false); simLogSql(rest, false);
pSql = taos_query(script->taos, rest); pSql = taos_query(script->taos, rest);
ret = taos_errno(pSql); ret = taos_errno(pSql);
......
...@@ -21,10 +21,13 @@ ...@@ -21,10 +21,13 @@
bool simAsyncQuery = false; bool simAsyncQuery = false;
bool simExecSuccess = false; bool simExecSuccess = false;
bool abortExecution = false;
void simHandleSignal(int32_t signo, void *sigInfo, void *context) { void simHandleSignal(int32_t signo, void *sigInfo, void *context) {
simSystemCleanUp(); simSystemCleanUp();
exit(1); abortExecution = true;
// runningScript->killed = true;
// exit(1);
} }
int32_t main(int32_t argc, char *argv[]) { int32_t main(int32_t argc, char *argv[]) {
...@@ -60,6 +63,11 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -60,6 +63,11 @@ int32_t main(int32_t argc, char *argv[]) {
return -1; return -1;
} }
if (abortExecution) {
simError("execute abort");
return -1;
}
simScriptList[++simScriptPos] = script; simScriptList[++simScriptPos] = script;
simExecuteScript(script); simExecuteScript(script);
......
...@@ -159,9 +159,17 @@ void *simExecuteScript(void *inputScript) { ...@@ -159,9 +159,17 @@ void *simExecuteScript(void *inputScript) {
script = simScriptList[simScriptPos]; script = simScriptList[simScriptPos];
} }
if (abortExecution) {
script->killed = true;
}
if (script->killed || script->linePos >= script->numOfLines) { if (script->killed || script->linePos >= script->numOfLines) {
printf("killed ---------------------->\n");
script = simProcessCallOver(script); script = simProcessCallOver(script);
if (script == NULL) break; if (script == NULL) {
printf("abort now!\n");
break;
}
} else { } else {
SCmdLine *line = &script->lines[script->linePos]; SCmdLine *line = &script->lines[script->linePos];
char * option = script->optionBuffer + line->optionOffset; char * option = script->optionBuffer + line->optionOffset;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册