提交 911d89c0 编写于 作者: D dapan1121

Merge branch 'develop' into feature/TD-3188

......@@ -42,12 +42,12 @@ def pre_test(){
killall -9 taosd ||echo "no taosd running"
killall -9 gdb || echo "no gdb running"
cd ${WKC}
git checkout develop
git reset --hard HEAD~10 >/dev/null
git checkout develop
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
find ${WKC}/tests/pytest -name \'*\'.sql -exec rm -rf {} \\;
git clean -dfx
cd ${WK}
git reset --hard HEAD~10
git checkout develop
......@@ -55,7 +55,7 @@ def pre_test(){
cd ${WK}
export TZ=Asia/Harbin
date
rm -rf ${WK}/debug
git clean -dfx
mkdir debug
cd debug
cmake .. > /dev/null
......
......@@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.24-dist.jar DESTINATION connector/jdbc)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.25-dist.jar DESTINATION connector/jdbc)
ENDIF ()
ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
......
......@@ -101,7 +101,7 @@ $ taos -h 192.168.0.1 -s "use db; show tables;"
### 运行SQL命令脚本
TDengine终端可以通过`source`命令来运行SQL命令脚本.
TDengine 终端可以通过 `source` 命令来运行 SQL 命令脚本.
```mysql
taos> source <filename>;
......@@ -109,10 +109,10 @@ taos> source <filename>;
### Shell小技巧
- 可以使用上下光标键查看已经历史输入的命
- 修改用户密码。在shell中使用alter user命
- 可以使用上下光标键查看历史输入的指
- 修改用户密码。在 shell 中使用 alter user 指
- ctrl+c 中止正在进行中的查询
- 执行`RESET QUERY CACHE`清空本地缓存的表的schema
- 执行 `RESET QUERY CACHE` 清空本地缓存的表 schema
## <a class="anchor" id="demo"></a>TDengine 极速体验
......@@ -179,19 +179,20 @@ taos> select avg(f1), max(f2), min(f3) from test.t10 interval(10s);
### TDengine服务器支持的平台列表
| | **CentOS 6/7/8** | **Ubuntu 16/18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** |
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- |
| X64 | ● | ● | | ○ | ● | ● |
| 树莓派 ARM32 | | ● | ● | | | |
| 龙芯 MIPS64 | | | ● | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | ● | |
| 申威 Alpha64 | | | ○ | ● | | |
| 飞腾 ARM64 | | ○ 优麒麟 | | | | |
| 海光 X64 | ● | ● | ● | ○ | ● | ● |
| 瑞芯微 ARM64/32 | | | ○ | | | |
| 全志 ARM64/32 | | | ○ | | | |
| 炬力 ARM64/32 | | | ○ | | | |
| TI ARM32 | | | ○ | | | |
| | **CentOS 6/7/8** | **Ubuntu 16/18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **华为 EulerOS** |
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- | --------------------- |
| X64 | ● | ● | | ○ | ● | ● | ● |
| 树莓派 ARM32 | | ● | ● | | | | |
| 龙芯 MIPS64 | | | ● | | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | ● | | |
| 申威 Alpha64 | | | ○ | ● | | | |
| 飞腾 ARM64 | | ○ 优麒麟 | | | | | |
| 海光 X64 | ● | ● | ● | ○ | ● | ● | |
| 瑞芯微 ARM64/32 | | | ○ | | | | |
| 全志 ARM64/32 | | | ○ | | | | |
| 炬力 ARM64/32 | | | ○ | | | | |
| TI ARM32 | | | ○ | | | | |
| 华为云 ARM64 | | | | | | | ● |
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
......
......@@ -285,7 +285,7 @@ JDBC连接器可能报错的错误码包括3种:JDBC driver本身的报错(
* https://github.com/taosdata/TDengine/blob/develop/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java
* https://github.com/taosdata/TDengine/blob/develop/src/inc/taoserror.h
### 订阅
### <a class="anchor" id="subscribe"></a>订阅
#### 创建
......
......@@ -120,17 +120,17 @@ taosd -C
不同应用场景的数据往往具有不同的数据特征,比如保留天数、副本数、采集频次、记录大小、采集点的数量、压缩等都可完全不同。为获得在存储上的最高效率,TDengine提供如下存储相关的系统配置参数:
- days:一个数据文件存储数据的时间跨度,单位为天,默认值:10。
- keep:数据库中数据保留的天数,单位为天,默认值:3650。
- keep:数据库中数据保留的天数,单位为天,默认值:3650。(可通过 alter database 修改)
- minRows:文件块中记录的最小条数,单位为条,默认值:100。
- maxRows:文件块中记录的最大条数,单位为条,默认值:4096。
- comp:文件压缩标志位,0:关闭;1:一阶段压缩;2:两阶段压缩。默认值:2。
- comp:文件压缩标志位,0:关闭;1:一阶段压缩;2:两阶段压缩。默认值:2。(可通过 alter database 修改)
- walLevel:WAL级别。1:写wal,但不执行fsync;2:写wal, 而且执行fsync。默认值:1。
- fsync:当wal设置为2时,执行fsync的周期。设置为0,表示每次写入,立即执行fsync。单位为毫秒,默认值:3000。
- cache:内存块的大小,单位为兆字节(MB),默认值:16。
- blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。
- replica:副本个数,取值范围:1-3。单位为个,默认值:1
- precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms
- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(从 2.0.11 版本开始支持此参数)
- blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改)
- replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改)
- precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms
- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL:
......
......@@ -29,21 +29,21 @@ taos> DESCRIBE meters;
## <a class="anchor" id="data-type"></a>支持的数据类型
使用TDengine,最重要的是时间戳。创建并插入记录、查询历史记录的时候,均需要指定时间戳。时间戳有如下规则:
使用 TDengine,最重要的是时间戳。创建并插入记录、查询历史记录的时候,均需要指定时间戳。时间戳有如下规则:
- 时间格式为```YYYY-MM-DD HH:mm:ss.MS```, 默认时间分辨率为毫秒。比如:```2017-08-12 18:25:58.128```
- 内部函数now是服务器的当前时间
- 插入记录时,如果时间戳为now,插入数据时使用服务器当前时间
- Epoch Time: 时间戳也可以是一个长整数,表示从1970-01-01 08:00:00.000开始的毫秒数
- 时间可以加减,比如 now-2h,表明查询时刻向前推2个小时(最近2小时)。 数字后面的时间单位可以是 a(毫秒)、s(秒)、 m(分)、h(小时)、d(天)、w(周)。 比如select * from t1 where ts > now-2w and ts <= now-1w, 表示查询两周前整整一周的数据。 在指定降频操作(down sampling)的时间窗口(interval)时,时间单位还可以使用 n(自然月) 和 y(自然年)。
- 时间格式为 ```YYYY-MM-DD HH:mm:ss.MS```默认时间分辨率为毫秒。比如:```2017-08-12 18:25:58.128```
- 内部函数 now 是客户端的当前时间
- 插入记录时,如果时间戳为 now,插入数据时使用提交这条记录的客户端的当前时间
- Epoch Time:时间戳也可以是一个长整数,表示从 1970-01-01 08:00:00.000 开始的毫秒数
- 时间可以加减,比如 now-2h,表明查询时刻向前推 2 个小时(最近 2 小时)。数字后面的时间单位可以是 u(微秒)、a(毫秒)、s(秒)、m(分)、h(小时)、d(天)、w(周)。 比如 `select * from t1 where ts > now-2w and ts <= now-1w`,表示查询两周前整整一周的数据。在指定降频操作(down sampling)的时间窗口(interval)时,时间单位还可以使用 n(自然月) 和 y(自然年)。
TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMicrosecond就可支持微秒。
TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableMicrosecond 就可以支持微秒。
在TDengine中,普通表的数据模型中可使用以下10种数据类型。
在TDengine中,普通表的数据模型中可使用以下 10 种数据类型。
| | 类型 | Bytes | 说明 |
| ---- | :-------: | ------ | ------------------------------------------------------------ |
| 1 | TIMESTAMP | 8 | 时间戳。缺省精度毫秒,可支持微秒。从格林威治时间 1970-01-01 00:00:00.000 (UTC/GMT) 开始,计时不能早于该时间。 |
| 1 | TIMESTAMP | 8 | 时间戳。缺省精度毫秒,可支持微秒。从格林威治时间 1970-01-01 00:00:00.000 (UTC/GMT) 开始,计时不能早于该时间。(从 2.0.18 版本开始,已经去除了这一时间范围限制) |
| 2 | INT | 4 | 整型,范围 [-2^31+1, 2^31-1], -2^31 用作 NULL |
| 3 | BIGINT | 8 | 长整型,范围 [-2^63+1, 2^63-1], -2^63 用于 NULL |
| 4 | FLOAT | 4 | 浮点型,有效位数 6-7,范围 [-3.4E38, 3.4E38] |
......
......@@ -156,3 +156,13 @@ ALTER LOCAL RESETLOG;
```
其含义是,清空本机所有由客户端生成的日志文件。
## <a class="anchor" id="timezone"></a>18. 时间戳的时区信息是怎样处理的?
TDengine 中时间戳的时区总是由客户端进行处理,而与服务端无关。具体来说,客户端会对 SQL 语句中的时间戳进行时区转换,转为 UTC 时区(即 Unix 时间戳——Unix Timestamp)再交由服务端进行写入和查询;在读取数据时,服务端也是采用 UTC 时区提供原始数据,客户端收到后再根据本地设置,把时间戳转换为本地系统所要求的时区进行显示。
客户端在处理时间戳字符串时,会采取如下逻辑:
1. 在未做特殊设置的情况下,客户端默认使用所在操作系统的时区设置。
2. 如果在 taos.cfg 中设置了 timezone 参数,则客户端会以这个配置文件中的设置为准。
3. 如果在 C/C++/Java/Python 等各种编程语言的 Connector Driver 中,在建立数据库连接时显式指定了 timezone,那么会以这个指定的时区设置为准。例如 Java Connector 的 JDBC URL 中就有 timezone 参数。
4. 在书写 SQL 语句时,也可以直接使用 Unix 时间戳(例如 `1554984068000`)或带有时区的时间戳字符串,也即以 RFC 3339 格式(例如 `2013-04-12T15:52:01.123+08:00`)或 ISO-8601 格式(例如 `2013-04-12T15:52:01.123+0800`)来书写时间戳,此时这些时间戳的取值将不再受其他时区设置的影响。
......@@ -120,7 +120,7 @@ function clean_service_on_systemd() {
if [ "$verMode" == "cluster" ]; then
nginx_service_config="${service_config_dir}/${nginx_service_name}.service"
if [ -d ${bin_dir}/web ]; then
if [ -d ${install_nginxd_dir} ]; then
if systemctl is-active --quiet ${nginx_service_name}; then
echo "Nginx for TDengine is running, stopping it..."
${csudo} systemctl stop ${nginx_service_name} &> /dev/null || echo &> /dev/null
......
......@@ -36,19 +36,6 @@ extern "C" {
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
typedef struct SParsedColElem {
int16_t colIndex;
uint16_t offset;
} SParsedColElem;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfAssignedCols;
SParsedColElem elems[TSDB_MAX_COLUMNS];
bool hasVal[TSDB_MAX_COLUMNS];
} SParsedDataColInfo;
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
......@@ -118,6 +105,8 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset);
......@@ -141,6 +130,8 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
......
......@@ -175,6 +175,19 @@ typedef struct SParamInfo {
uint32_t offset;
} SParamInfo;
typedef struct SBoundColumn {
bool hasVal; // denote if current column has bound or not
int32_t offset; // all column offset value
} SBoundColumn;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfBound;
int32_t *boundedColumns;
SBoundColumn *cols;
} SParsedDataColInfo;
typedef struct STableDataBlocks {
SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client
......@@ -189,6 +202,8 @@ typedef struct STableDataBlocks {
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData;
SParsedDataColInfo boundColumnInfo;
// for parameter ('?') binding
uint32_t numOfAllocedParams;
uint32_t numOfParams;
......@@ -426,6 +441,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void destroyTableNameList(SSqlCmd* pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......@@ -440,6 +456,8 @@ void tscFreeSqlResult(SSqlObj *pSql);
* @param pObj
*/
void tscFreeSqlObj(SSqlObj *pSql);
void tscFreeSubobj(SSqlObj* pSql);
void tscFreeRegisteredSqlObj(void *pSql);
void tscCloseTscObj(void *pObj);
......@@ -461,6 +479,7 @@ char* tscGetSqlStr(SSqlObj* pSql);
bool tscIsQueryWithLimit(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
......
......@@ -505,10 +505,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return;
_error:
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
pRes->code = code;
tscAsyncResultOnError(pSql);
}
taosReleaseRef(tscObjRef, pSql->self);
}
......@@ -338,11 +338,20 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pReducer->resColModel->capacity = pReducer->nResultBufSize;
pReducer->finalModel = pFFModel;
int32_t expandFactor = 1;
if (finalmodel->rowSize > 0) {
bool topBotQuery = tscIsTopbotQuery(pQueryInfo);
if (topBotQuery) {
expandFactor = tscGetTopbotQueryParam(pQueryInfo);
pReducer->resColModel->capacity /= (finalmodel->rowSize * expandFactor);
pReducer->resColModel->capacity *= expandFactor;
} else {
pReducer->resColModel->capacity /= finalmodel->rowSize;
}
}
assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize);
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
......@@ -1149,9 +1158,10 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
memset(buf, 0, (size_t)maxBufSize);
memcpy(buf, pCtx->pOutput, (size_t)pCtx->outputBytes);
char* next = pCtx->pOutput;
for (int32_t i = 0; i < inc; ++i) {
pCtx->pOutput += pCtx->outputBytes;
memcpy(pCtx->pOutput, buf, (size_t)pCtx->outputBytes);
next += pCtx->outputBytes;
memcpy(next, buf, (size_t)pCtx->outputBytes);
}
}
......@@ -1439,6 +1449,11 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tFilePage *tmpBuffer = pLocalMerge->pTempBuffer;
int32_t remain = 1;
if (tscIsTopbotQuery(pQueryInfo)) {
remain = tscGetTopbotQueryParam(pQueryInfo);
}
if (doHandleLastRemainData(pSql)) {
return TSDB_CODE_SUCCESS;
}
......@@ -1527,7 +1542,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* if the previous group does NOT generate any result (pResBuf->num == 0),
* continue to process results instead of return results.
*/
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalMerge->resColModel->capacity)) {
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num + remain >= pLocalMerge->resColModel->capacity)) {
// does not belong to the same group
bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup);
......
此差异已折叠。
......@@ -3567,7 +3567,8 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
}
if (pSchema->type == TSDB_DATA_TYPE_BOOL) {
if (pExpr->tokenId != TK_EQ && pExpr->tokenId != TK_NE) {
int32_t t = pExpr->tokenId;
if (t != TK_EQ && t != TK_NE && t != TK_NOTNULL && t != TK_ISNULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
......@@ -3767,7 +3768,8 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryInfo* pQuer
}
pList->ids[pList->num++] = index;
} else if (pExpr->tokenId == TK_FLOAT && (isnan(pExpr->value.dKey) || isinf(pExpr->value.dKey))) {
} else if ((pExpr->tokenId == TK_FLOAT && (isnan(pExpr->value.dKey) || isinf(pExpr->value.dKey))) ||
pExpr->tokenId == TK_NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
} else if (pExpr->type == SQL_NODE_SQLFUNCTION) {
if (*type == NON_ARITHMEIC_EXPR) {
......@@ -3999,6 +4001,39 @@ static int32_t setExprToCond(tSqlExpr** parent, tSqlExpr* pExpr, const char* msg
return TSDB_CODE_SUCCESS;
}
static int32_t validateNullExpr(tSqlExpr* pExpr, char* msgBuf) {
const char* msg = "only support is [not] null";
tSqlExpr* pRight = pExpr->pRight;
if (pRight->tokenId == TK_NULL && (!(pExpr->tokenId == TK_ISNULL || pExpr->tokenId == TK_NOTNULL))) {
return invalidSqlErrMsg(msgBuf, msg);
}
return TSDB_CODE_SUCCESS;
}
// check for like expression
static int32_t validateLikeExpr(tSqlExpr* pExpr, STableMeta* pTableMeta, int32_t index, char* msgBuf) {
const char* msg1 = "wildcard string should be less than 20 characters";
const char* msg2 = "illegal column name";
tSqlExpr* pLeft = pExpr->pLeft;
tSqlExpr* pRight = pExpr->pRight;
if (pExpr->tokenId == TK_LIKE) {
if (pRight->value.nLen > TSDB_PATTERN_STRING_MAX_LEN) {
return invalidSqlErrMsg(msgBuf, msg1);
}
SSchema* pSchema = tscGetTableSchema(pTableMeta);
if ((!isTablenameToken(&pLeft->colInfo)) && !IS_VAR_DATA_TYPE(pSchema[index].type)) {
return invalidSqlErrMsg(msgBuf, msg2);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr,
int32_t* type, int32_t parentOptr) {
const char* msg1 = "table query cannot use tags filter";
......@@ -4008,7 +4043,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
const char* msg5 = "not support ordinary column join";
const char* msg6 = "only one query condition on tbname allowed";
const char* msg7 = "only in/like allowed in filter table name";
const char* msg8 = "wildcard string should be less than 20 characters";
tSqlExpr* pLeft = (*pExpr)->pLeft;
tSqlExpr* pRight = (*pExpr)->pRight;
......@@ -4025,6 +4059,18 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// validate the null expression
int32_t code = validateNullExpr(*pExpr, tscGetErrorMsgPayload(pCmd));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// validate the like expression
code = validateLikeExpr(*pExpr, pTableMeta, index.columnIndex, tscGetErrorMsgPayload(pCmd));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { // query on time range
if (!validateJoinExprNode(pCmd, pQueryInfo, *pExpr, &index)) {
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -4046,7 +4092,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
int16_t leftIdx = index.tableIndex;
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, &pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -4093,20 +4138,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
// check for like expression
if ((*pExpr)->tokenId == TK_LIKE) {
if (pRight->value.nLen > TSDB_PATTERN_STRING_MAX_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
}
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
if ((!isTablenameToken(&pLeft->colInfo)) && pSchema[index.columnIndex].type != TSDB_DATA_TYPE_BINARY &&
pSchema[index.columnIndex].type != TSDB_DATA_TYPE_NCHAR) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
}
// in case of in operator, keep it in a seprate attribute
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
if (!validTableNameOptr(*pExpr)) {
......
......@@ -2826,7 +2826,6 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
tscKeepConn[TSDB_SQL_SHOW] = 1;
tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
tscKeepConn[TSDB_SQL_SELECT] = 1;
......
......@@ -299,6 +299,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tfree(pTableMetaInfo->pTableMeta);
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
......
......@@ -271,6 +271,41 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
return false;
}
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr == NULL) {
continue;
}
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
return true;
}
}
return false;
}
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr == NULL) {
continue;
}
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
return (int32_t) pExpr->param[0].i64;
}
}
return 0;
}
void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
if (!tscIsPointInterpQuery(pQueryInfo)) {
return;
......@@ -309,7 +344,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
int32_t offset = 0;
for (int32_t i = 0; i < pRes->numOfCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
pRes->urow[i] = pRes->data + offset * pRes->numOfRows;
......@@ -415,6 +450,20 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
tfree(pCmd->pQueryInfo);
}
void destroyTableNameList(SSqlCmd* pCmd) {
if (pCmd->numOfTables == 0) {
assert(pCmd->pTableNameList == NULL);
return;
}
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
tfree(pCmd->pTableNameList[i]);
}
pCmd->numOfTables = 0;
tfree(pCmd->pTableNameList);
}
void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd->command = 0;
pCmd->numOfCols = 0;
......@@ -424,14 +473,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd->parseFinished = 0;
pCmd->autoCreated = 0;
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
if (pCmd->pTableNameList && pCmd->pTableNameList[i]) {
tfree(pCmd->pTableNameList[i]);
}
}
pCmd->numOfTables = 0;
tfree(pCmd->pTableNameList);
destroyTableNameList(pCmd);
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
......@@ -448,7 +490,7 @@ void tscFreeSqlResult(SSqlObj* pSql) {
memset(&pSql->res, 0, sizeof(SSqlRes));
}
static void tscFreeSubobj(SSqlObj* pSql) {
void tscFreeSubobj(SSqlObj* pSql) {
if (pSql->subState.numOfSub == 0) {
return;
}
......@@ -549,6 +591,11 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql);
}
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->boundedColumns);
tfree(pColInfo->cols);
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) {
return;
......@@ -569,6 +616,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
tfree(pDataBlock);
}
......@@ -699,7 +747,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* @param dataBlocks
* @return
*/
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name,
int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) {
......@@ -707,10 +755,12 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
dataBuf->nAllocSize = (uint32_t)initialSize;
dataBuf->headerSize = startOffset; // the header size will always be the startOffset value, reserved for the subumit block header
dataBuf->nAllocSize = (uint32_t)defaultSize;
dataBuf->headerSize = startOffset;
// the header size will always be the startOffset value, reserved for the subumit block header
if (dataBuf->nAllocSize <= dataBuf->headerSize) {
dataBuf->nAllocSize = dataBuf->headerSize*2;
dataBuf->nAllocSize = dataBuf->headerSize * 2;
}
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
......@@ -720,25 +770,31 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = tscGetTableSchema(dataBuf->pTableMeta);
tscSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name);
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) {
SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
SArray* pBlockList) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) {
......@@ -847,6 +903,8 @@ static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) {
int32_t i = 0;
while(p1) {
STableDataBlocks* pBlocks = *p1;
tfree(pCmd->pTableNameList[i]);
pCmd->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pCmd->pTableBlockHashList, p1);
}
......@@ -963,7 +1021,7 @@ bool tscIsInsertData(char* sqlstr) {
int32_t index = 0;
do {
SStrToken t0 = tStrGetToken(sqlstr, &index, false, 0, NULL);
SStrToken t0 = tStrGetToken(sqlstr, &index, false);
if (t0.type != TK_LP) {
return t0.type == TK_INSERT || t0.type == TK_IMPORT;
}
......
......@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.24-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.25-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
......
......@@ -5,7 +5,7 @@
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.24</version>
<version>2.0.25</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
......
......@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.24</version>
<version>2.0.25</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
......@@ -62,6 +62,14 @@
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<excludes>
<exclude>**/*.md</exclude>
</excludes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
......
......@@ -15,7 +15,7 @@ public abstract class AbstractParameterMetaData extends WrapperImpl implements P
@Override
public int getParameterCount() throws SQLException {
return parameters.length;
return parameters == null ? 0 : parameters.length;
}
@Override
......
......@@ -29,45 +29,25 @@ public class TSDBJNIConnector {
private static volatile Boolean isInitialized = false;
private TaosInfo taosInfo = TaosInfo.getInstance();
// Connection pointer used in C
private long taos = TSDBConstants.JNI_NULL_POINTER;
// result set status in current connection
private boolean isResultsetClosed = true;
private int affectedRows = -1;
static {
System.loadLibrary("taos");
System.out.println("java.library.path:" + System.getProperty("java.library.path"));
}
/**
* Connection pointer used in C
*/
private long taos = TSDBConstants.JNI_NULL_POINTER;
/**
* Result set pointer for the current connection
*/
// private long taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
/**
* result set status in current connection
*/
private boolean isResultsetClosed = true;
private int affectedRows = -1;
/**
* Whether the connection is closed
*/
public boolean isClosed() {
return this.taos == TSDBConstants.JNI_NULL_POINTER;
}
/**
* Returns the status of last result set in current connection
*/
public boolean isResultsetClosed() {
return this.isResultsetClosed;
}
/**
* Initialize static variables in JNI to optimize performance
*/
public static void init(String configDir, String locale, String charset, String timezone) throws SQLWarning {
synchronized (isInitialized) {
if (!isInitialized) {
......@@ -93,11 +73,6 @@ public class TSDBJNIConnector {
public static native String getTsCharset();
/**
* Get connection pointer
*
* @throws SQLException
*/
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
// this.closeConnectionImp(this.taos);
......@@ -185,13 +160,6 @@ public class TSDBJNIConnector {
private native String getErrMsgImp(long pSql);
/**
* Get resultset pointer
* Each connection should have a single open result set at a time
*/
// public long getResultSet() {
// return taosResultSetPointer;
// }
private native long getResultSetImp(long connection, long pSql);
public boolean isUpdateQuery(long pSql) {
......@@ -231,6 +199,7 @@ public class TSDBJNIConnector {
// }
// return resCode;
// }
private native int freeResultSetImp(long connection, long result);
/**
......@@ -323,8 +292,7 @@ public class TSDBJNIConnector {
* Validate if a <I>create table</I> sql statement is correct without actually creating that table
*/
public boolean validateCreateTableSql(String sql) {
long connection = taos;
int res = validateCreateTableSqlImp(connection, sql.getBytes());
int res = validateCreateTableSqlImp(taos, sql.getBytes());
return res != 0 ? false : true;
}
......
......@@ -84,9 +84,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
return new Timestamp(row.getDate(colIndex).getTime());
case TSDBConstants.TSDB_DATA_TYPE_BINARY:
return row.getString(colIndex).getBytes();
return row.getString(colIndex) == null ? null : row.getString(colIndex).getBytes();
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
return row.getString(colIndex);
return row.getString(colIndex) == null ? null : row.getString(colIndex);
default:
return row.get(colIndex);
}
......
package com.taosdata.jdbc.utils;
public class OSUtils {
private static final String OS = System.getProperty("os.name").toLowerCase();
public static boolean isWindows() {
return OS.indexOf("win") >= 0;
}
public static boolean isMac() {
return OS.indexOf("mac") >= 0;
}
public static boolean isLinux() {
return OS.indexOf("nux") >= 0;
}
}
......@@ -17,7 +17,6 @@ public class DriverAutoloadTest {
@Test
public void testRestful() throws SQLException {
// Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(url, properties);
Assert.assertNotNull(conn);
......
package com.taosdata.jdbc.cases;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
public class NullValueInResultSetForJdbcRestfulTest {
private static final String host = "127.0.0.1";
Connection conn;
@Test
public void test() {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select * from weather");
ResultSetMetaData meta = rs.getMetaData();
while (rs.next()) {
for (int i = 1; i <= meta.getColumnCount(); i++) {
Object value = rs.getObject(i);
System.out.print(meta.getColumnLabel(i) + ": " + value + "\t");
}
System.out.println();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Before
public void before() throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata";
conn = DriverManager.getConnection(url);
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_null");
stmt.execute("create database if not exists test_null");
stmt.execute("use test_null");
stmt.execute("create table weather(ts timestamp, f1 int, f2 bigint, f3 float, f4 double, f5 smallint, f6 tinyint, f7 bool, f8 binary(64), f9 nchar(64))");
stmt.executeUpdate("insert into weather(ts, f1) values(now+1s, 1)");
stmt.executeUpdate("insert into weather(ts, f2) values(now+2s, 2)");
stmt.executeUpdate("insert into weather(ts, f3) values(now+3s, 3.0)");
stmt.executeUpdate("insert into weather(ts, f4) values(now+4s, 4.0)");
stmt.executeUpdate("insert into weather(ts, f5) values(now+5s, 5)");
stmt.executeUpdate("insert into weather(ts, f6) values(now+6s, 6)");
stmt.executeUpdate("insert into weather(ts, f7) values(now+7s, true)");
stmt.executeUpdate("insert into weather(ts, f8) values(now+8s, 'hello')");
stmt.executeUpdate("insert into weather(ts, f9) values(now+9s, '涛思数据')");
} catch (SQLException e) {
e.printStackTrace();
}
}
@After
public void after() {
try {
if (conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package com.taosdata.jdbc.utils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class OSUtilsTest {
private String OS;
@Test
public void inWindows() {
Assert.assertEquals(OS.indexOf("win") >= 0, OSUtils.isWindows());
}
@Test
public void isMac() {
Assert.assertEquals(OS.indexOf("mac") >= 0, OSUtils.isMac());
}
@Test
public void isLinux() {
Assert.assertEquals(OS.indexOf("nux") >= 0, OSUtils.isLinux());
}
@Before
public void before() {
OS = System.getProperty("os.name").toLowerCase();
}
}
\ No newline at end of file
......@@ -294,7 +294,7 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex);
}
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) {
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start) {
if (tsEnableStream == 0) {
return NULL;
}
......@@ -326,7 +326,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
pObj->rid = taosAddRef(cqObjRef, pObj);
if(start && pContext->master) {
cqCreateStream(pContext, pObj);
} else {
pObj->pContext = pContext;
}
rid = pObj->rid;
......
......@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema, 1);
}
tdFreeSchema(pSchema);
......
......@@ -71,13 +71,16 @@ static SStep tsDnodeSteps[] = {
{"dnode-vread", dnodeInitVRead, dnodeCleanupVRead},
{"dnode-vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"dnode-vmgmt", dnodeInitVMgmt, dnodeCleanupVMgmt},
{"dnode-mread", dnodeInitMRead, dnodeCleanupMRead},
{"dnode-mwrite", dnodeInitMWrite, dnodeCleanupMWrite},
{"dnode-mpeer", dnodeInitMPeer, dnodeCleanupMPeer},
{"dnode-mread", dnodeInitMRead, NULL},
{"dnode-mwrite", dnodeInitMWrite, NULL},
{"dnode-mpeer", dnodeInitMPeer, NULL},
{"dnode-client", dnodeInitClient, dnodeCleanupClient},
{"dnode-server", dnodeInitServer, dnodeCleanupServer},
{"dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes},
{"dnode-modules", dnodeInitModules, dnodeCleanupModules},
{"dnode-mread", NULL, dnodeCleanupMRead},
{"dnode-mwrite", NULL, dnodeCleanupMWrite},
{"dnode-mpeer", NULL, dnodeCleanupMPeer},
{"dnode-shell", dnodeInitShell, dnodeCleanupShell},
{"dnode-statustmr", dnodeInitStatusTimer,dnodeCleanupStatusTimer},
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
......@@ -236,6 +239,20 @@ static int32_t dnodeInitStorage() {
return -1;
}
TDIR *tdir = tfsOpendir("vnode_bak/.staging");
bool stagingNotEmpty = tfsReaddir(tdir) != NULL;
tfsClosedir(tdir);
if (stagingNotEmpty) {
dError("vnode_bak/.staging dir not empty, fix it first.");
return -1;
}
if (tfsMkdir("vnode_bak/.staging") < 0) {
dError("failed to create vnode_bak/.staging dir since %s", tstrerror(terrno));
return -1;
}
dnodeCheckDataDirOpenned(tsDnodeDir);
dInfo("dnode storage is initialized at %s", tsDnodeDir);
......
......@@ -205,7 +205,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead.msgType], qtypeStr[qtype], pWrite->pHead.version);
pWrite->code = vnodeProcessWrite(pVnode, &pWrite->pHead, qtype, pWrite);
if (pWrite->code <= 0) pWrite->processedCount = 1;
if (pWrite->code <= 0) atomic_add_fetch_32(&pWrite->processedCount, 1);
if (pWrite->code > 0) pWrite->code = 0;
if (pWrite->code == 0 && pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
......@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else {
if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
vnodeConfirmForward(pVnode, pWrite->pHead.version, pWrite->code, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
}
if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp);
......
......@@ -262,7 +262,7 @@ do { \
#define TSDB_MIN_TABLES 4
#define TSDB_MAX_TABLES 10000000
#define TSDB_DEFAULT_TABLES 1000000
#define TSDB_TABLES_STEP 100
#define TSDB_TABLES_STEP 1000
#define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_DAYS_PER_FILE 3650
......
......@@ -42,7 +42,7 @@ void cqStart(void *handle);
void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle);
......
......@@ -51,7 +51,7 @@ typedef struct {
void *cqH;
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start);
void (*cqDropFunc)(void *handle);
} STsdbAppH;
......
......@@ -79,9 +79,6 @@ typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion);
// get file version
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
// reset version
typedef int32_t (*FResetVersion)(int32_t vgId, uint64_t fver);
typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd);
typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd);
......@@ -99,7 +96,6 @@ typedef struct {
FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp;
FResetVersion resetVersionFp;
FSendFile sendFileFp;
FRecvFile recvFileFp;
} SSyncInfo;
......
......@@ -221,6 +221,7 @@
#define TK_SPACE 300
#define TK_COMMENT 301
#define TK_ILLEGAL 302
......
此差异已折叠。
......@@ -39,6 +39,22 @@ typedef struct {
int8_t type;
} SOColInfo;
#define debugPrint(fmt, ...) \
do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
#define performancePrint(fmt, ...) \
do { if (g_args.performance_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
#define errorPrint(fmt, ...) \
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
// -------------------------- SHOW DATABASE INTERFACE-----------------------
enum _show_db_index {
TSDB_SHOW_DB_NAME_INDEX,
......@@ -199,16 +215,18 @@ static struct argp_option options[] = {
{"schemaonly", 's', 0, 0, "Only dump schema.", 3},
{"with-property", 'M', 0, 0, "Dump schema with properties.", 3},
{"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3},
{"end-time", 'E', "END_TIME", 0, "End time to dump.", 3},
{"end-time", 'E', "END_TIME", 0, "End time to dump. Epoch or ISO8601/RFC3339 format is acceptable. For example: 2017-10-01T18:00:00+0800", 3},
{"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3},
{"max-sql-len", 'L', "SQL_LEN", 0, "Max length of one sql. Default is 65480.", 3},
{"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3},
{"thread_num", 'T', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3},
{"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3},
{"debug", 'g', 0, 0, "Print debug info.", 1},
{"verbose", 'v', 0, 0, "Print verbose debug info.", 1},
{0}};
/* Used by main to communicate with parse_opt. */
struct arguments {
typedef struct arguments {
// connection option
char *host;
char *user;
......@@ -240,7 +258,10 @@ struct arguments {
char **arg_list;
int arg_list_len;
bool isDumpIn;
};
bool debug_print;
bool verbose_print;
bool performance_print;
} SArguments;
/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
......@@ -286,6 +307,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
tstrncpy(arguments->outpath, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'g':
arguments->debug_print = true;
break;
case 'i':
arguments->isDumpIn = true;
if (wordexp(arg, &full_path, 0) != 0) {
......@@ -387,7 +411,7 @@ int taosCheckParam(struct arguments *arguments);
void taosFreeDbInfos();
static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, int32_t numOfThread, char *dbName);
struct arguments tsArguments = {
struct arguments g_args = {
// connection option
NULL,
"root",
......@@ -421,7 +445,10 @@ struct arguments tsArguments = {
0,
NULL,
0,
false
false,
false, // debug_print
false, // verbose_print
false // performance_print
};
static int queryDbImpl(TAOS *taos, char *command) {
......@@ -453,13 +480,44 @@ static int queryDbImpl(TAOS *taos, char *command) {
return 0;
}
static void parse_args(int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-E") == 0) {
if (argv[i+1]) {
char *tmp = argv[++i];
int64_t tmpEpoch;
if (strchr(tmp, ':') && strchr(tmp, '-')) {
if (TSDB_CODE_SUCCESS != taosParseTime(
tmp, &tmpEpoch, strlen(tmp), TSDB_TIME_PRECISION_MILLI, 0)) {
fprintf(stderr, "Input end time error!\n");
return;
}
} else {
tmpEpoch = atoll(tmp);
}
sprintf(argv[i], "%"PRId64"", tmpEpoch);
debugPrint("%s() LN%d, tmp is: %s, argv[%d]: %s\n",
__func__, __LINE__, tmp, i, argv[i]);
} else {
fprintf(stderr, "Input end time error!\n");
return;
}
} else if (strcmp(argv[i], "-g") == 0) {
arguments->debug_print = true;
}
}
}
int main(int argc, char *argv[]) {
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
argp_parse(&argp, argc, argv, 0, 0, &tsArguments);
parse_args(argc, argv, &g_args);
argp_parse(&argp, argc, argv, 0, 0, &g_args);
if (tsArguments.abort) {
if (g_args.abort) {
#ifndef _ALPINE
error(10, 0, "ABORTED");
#else
......@@ -469,81 +527,82 @@ int main(int argc, char *argv[]) {
printf("====== arguments config ======\n");
{
printf("host: %s\n", tsArguments.host);
printf("user: %s\n", tsArguments.user);
printf("password: %s\n", tsArguments.password);
printf("port: %u\n", tsArguments.port);
printf("cversion: %s\n", tsArguments.cversion);
printf("mysqlFlag: %d\n", tsArguments.mysqlFlag);
printf("outpath: %s\n", tsArguments.outpath);
printf("inpath: %s\n", tsArguments.inpath);
printf("resultFile: %s\n", tsArguments.resultFile);
printf("encode: %s\n", tsArguments.encode);
printf("all_databases: %d\n", tsArguments.all_databases);
printf("databases: %d\n", tsArguments.databases);
printf("schemaonly: %d\n", tsArguments.schemaonly);
printf("with_property: %d\n", tsArguments.with_property);
printf("start_time: %" PRId64 "\n", tsArguments.start_time);
printf("end_time: %" PRId64 "\n", tsArguments.end_time);
printf("data_batch: %d\n", tsArguments.data_batch);
printf("max_sql_len: %d\n", tsArguments.max_sql_len);
printf("table_batch: %d\n", tsArguments.table_batch);
printf("thread_num: %d\n", tsArguments.thread_num);
printf("allow_sys: %d\n", tsArguments.allow_sys);
printf("abort: %d\n", tsArguments.abort);
printf("isDumpIn: %d\n", tsArguments.isDumpIn);
printf("arg_list_len: %d\n", tsArguments.arg_list_len);
for (int32_t i = 0; i < tsArguments.arg_list_len; i++) {
printf("arg_list[%d]: %s\n", i, tsArguments.arg_list[i]);
printf("host: %s\n", g_args.host);
printf("user: %s\n", g_args.user);
printf("password: %s\n", g_args.password);
printf("port: %u\n", g_args.port);
printf("cversion: %s\n", g_args.cversion);
printf("mysqlFlag: %d\n", g_args.mysqlFlag);
printf("outpath: %s\n", g_args.outpath);
printf("inpath: %s\n", g_args.inpath);
printf("resultFile: %s\n", g_args.resultFile);
printf("encode: %s\n", g_args.encode);
printf("all_databases: %d\n", g_args.all_databases);
printf("databases: %d\n", g_args.databases);
printf("schemaonly: %d\n", g_args.schemaonly);
printf("with_property: %d\n", g_args.with_property);
printf("start_time: %" PRId64 "\n", g_args.start_time);
printf("end_time: %" PRId64 "\n", g_args.end_time);
printf("data_batch: %d\n", g_args.data_batch);
printf("max_sql_len: %d\n", g_args.max_sql_len);
printf("table_batch: %d\n", g_args.table_batch);
printf("thread_num: %d\n", g_args.thread_num);
printf("allow_sys: %d\n", g_args.allow_sys);
printf("abort: %d\n", g_args.abort);
printf("isDumpIn: %d\n", g_args.isDumpIn);
printf("arg_list_len: %d\n", g_args.arg_list_len);
printf("debug_print: %d\n", g_args.debug_print);
for (int32_t i = 0; i < g_args.arg_list_len; i++) {
printf("arg_list[%d]: %s\n", i, g_args.arg_list[i]);
}
}
printf("==============================\n");
if (tsArguments.cversion[0] != 0){
tstrncpy(version, tsArguments.cversion, 11);
if (g_args.cversion[0] != 0){
tstrncpy(version, g_args.cversion, 11);
}
if (taosCheckParam(&tsArguments) < 0) {
if (taosCheckParam(&g_args) < 0) {
exit(EXIT_FAILURE);
}
g_fpOfResult = fopen(tsArguments.resultFile, "a");
g_fpOfResult = fopen(g_args.resultFile, "a");
if (NULL == g_fpOfResult) {
fprintf(stderr, "Failed to open %s for save result\n", tsArguments.resultFile);
fprintf(stderr, "Failed to open %s for save result\n", g_args.resultFile);
return 1;
};
fprintf(g_fpOfResult, "#############################################################################\n");
fprintf(g_fpOfResult, "============================== arguments config =============================\n");
{
fprintf(g_fpOfResult, "host: %s\n", tsArguments.host);
fprintf(g_fpOfResult, "user: %s\n", tsArguments.user);
fprintf(g_fpOfResult, "password: %s\n", tsArguments.password);
fprintf(g_fpOfResult, "port: %u\n", tsArguments.port);
fprintf(g_fpOfResult, "cversion: %s\n", tsArguments.cversion);
fprintf(g_fpOfResult, "mysqlFlag: %d\n", tsArguments.mysqlFlag);
fprintf(g_fpOfResult, "outpath: %s\n", tsArguments.outpath);
fprintf(g_fpOfResult, "inpath: %s\n", tsArguments.inpath);
fprintf(g_fpOfResult, "resultFile: %s\n", tsArguments.resultFile);
fprintf(g_fpOfResult, "encode: %s\n", tsArguments.encode);
fprintf(g_fpOfResult, "all_databases: %d\n", tsArguments.all_databases);
fprintf(g_fpOfResult, "databases: %d\n", tsArguments.databases);
fprintf(g_fpOfResult, "schemaonly: %d\n", tsArguments.schemaonly);
fprintf(g_fpOfResult, "with_property: %d\n", tsArguments.with_property);
fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", tsArguments.start_time);
fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", tsArguments.end_time);
fprintf(g_fpOfResult, "data_batch: %d\n", tsArguments.data_batch);
fprintf(g_fpOfResult, "max_sql_len: %d\n", tsArguments.max_sql_len);
fprintf(g_fpOfResult, "table_batch: %d\n", tsArguments.table_batch);
fprintf(g_fpOfResult, "thread_num: %d\n", tsArguments.thread_num);
fprintf(g_fpOfResult, "allow_sys: %d\n", tsArguments.allow_sys);
fprintf(g_fpOfResult, "abort: %d\n", tsArguments.abort);
fprintf(g_fpOfResult, "isDumpIn: %d\n", tsArguments.isDumpIn);
fprintf(g_fpOfResult, "arg_list_len: %d\n", tsArguments.arg_list_len);
for (int32_t i = 0; i < tsArguments.arg_list_len; i++) {
fprintf(g_fpOfResult, "arg_list[%d]: %s\n", i, tsArguments.arg_list[i]);
fprintf(g_fpOfResult, "host: %s\n", g_args.host);
fprintf(g_fpOfResult, "user: %s\n", g_args.user);
fprintf(g_fpOfResult, "password: %s\n", g_args.password);
fprintf(g_fpOfResult, "port: %u\n", g_args.port);
fprintf(g_fpOfResult, "cversion: %s\n", g_args.cversion);
fprintf(g_fpOfResult, "mysqlFlag: %d\n", g_args.mysqlFlag);
fprintf(g_fpOfResult, "outpath: %s\n", g_args.outpath);
fprintf(g_fpOfResult, "inpath: %s\n", g_args.inpath);
fprintf(g_fpOfResult, "resultFile: %s\n", g_args.resultFile);
fprintf(g_fpOfResult, "encode: %s\n", g_args.encode);
fprintf(g_fpOfResult, "all_databases: %d\n", g_args.all_databases);
fprintf(g_fpOfResult, "databases: %d\n", g_args.databases);
fprintf(g_fpOfResult, "schemaonly: %d\n", g_args.schemaonly);
fprintf(g_fpOfResult, "with_property: %d\n", g_args.with_property);
fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", g_args.start_time);
fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", g_args.end_time);
fprintf(g_fpOfResult, "data_batch: %d\n", g_args.data_batch);
fprintf(g_fpOfResult, "max_sql_len: %d\n", g_args.max_sql_len);
fprintf(g_fpOfResult, "table_batch: %d\n", g_args.table_batch);
fprintf(g_fpOfResult, "thread_num: %d\n", g_args.thread_num);
fprintf(g_fpOfResult, "allow_sys: %d\n", g_args.allow_sys);
fprintf(g_fpOfResult, "abort: %d\n", g_args.abort);
fprintf(g_fpOfResult, "isDumpIn: %d\n", g_args.isDumpIn);
fprintf(g_fpOfResult, "arg_list_len: %d\n", g_args.arg_list_len);
for (int32_t i = 0; i < g_args.arg_list_len; i++) {
fprintf(g_fpOfResult, "arg_list[%d]: %s\n", i, g_args.arg_list[i]);
}
}
......@@ -552,11 +611,11 @@ int main(int argc, char *argv[]) {
time_t tTime = time(NULL);
struct tm tm = *localtime(&tTime);
if (tsArguments.isDumpIn) {
if (g_args.isDumpIn) {
fprintf(g_fpOfResult, "============================== DUMP IN ============================== \n");
fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpIn(&tsArguments) < 0) {
if (taosDumpIn(&g_args) < 0) {
fprintf(g_fpOfResult, "\n");
fclose(g_fpOfResult);
return -1;
......@@ -565,7 +624,7 @@ int main(int argc, char *argv[]) {
fprintf(g_fpOfResult, "============================== DUMP OUT ============================== \n");
fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpOut(&tsArguments) < 0) {
if (taosDumpOut(&g_args) < 0) {
fprintf(g_fpOfResult, "\n");
fclose(g_fpOfResult);
return -1;
......@@ -1236,8 +1295,8 @@ void* taosDumpOutWorkThreadFp(void *arg)
FILE *fp = NULL;
memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128);
if (tsArguments.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex);
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d.sql", g_args.outpath, pThread->dbName, pThread->threadIndex);
} else {
sprintf(tmpBuf, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex);
}
......@@ -1270,7 +1329,7 @@ void* taosDumpOutWorkThreadFp(void *arg)
ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord));
if (readLen <= 0) break;
int ret = taosDumpTable(tableRecord.name, tableRecord.metric, &tsArguments, fp, pThread->taosCon, pThread->dbName);
int ret = taosDumpTable(tableRecord.name, tableRecord.metric, &g_args, fp, pThread->taosCon, pThread->dbName);
if (ret >= 0) {
// TODO: sum table count and table rows by self
pThread->tablesOfDumpOut++;
......@@ -1282,13 +1341,13 @@ void* taosDumpOutWorkThreadFp(void *arg)
}
tablesInOneFile++;
if (tablesInOneFile >= tsArguments.table_batch) {
if (tablesInOneFile >= g_args.table_batch) {
fclose(fp);
tablesInOneFile = 0;
memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128);
if (tsArguments.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d-%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex, fileNameIndex);
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d-%d.sql", g_args.outpath, pThread->dbName, pThread->threadIndex, fileNameIndex);
} else {
sprintf(tmpBuf, "%s.tables.%d-%d.sql", pThread->dbName, pThread->threadIndex, fileNameIndex);
}
......@@ -1491,14 +1550,14 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
taos_free_result(res);
lseek(fd, 0, SEEK_SET);
int maxThreads = tsArguments.thread_num;
int maxThreads = g_args.thread_num;
int tableOfPerFile ;
if (numOfTable <= tsArguments.thread_num) {
if (numOfTable <= g_args.thread_num) {
tableOfPerFile = 1;
maxThreads = numOfTable;
} else {
tableOfPerFile = numOfTable / tsArguments.thread_num;
if (0 != numOfTable % tsArguments.thread_num) {
tableOfPerFile = numOfTable / g_args.thread_num;
if (0 != numOfTable % g_args.thread_num) {
tableOfPerFile += 1;
}
}
......@@ -2214,7 +2273,7 @@ void* taosDumpInWorkThreadFp(void *arg)
continue;
}
fprintf(stderr, "Success Open input file: %s\n", SQLFileName);
taosDumpInOneFile(pThread->taosCon, fp, tsfCharset, tsArguments.encode, SQLFileName);
taosDumpInOneFile(pThread->taosCon, fp, tsfCharset, g_args.encode, SQLFileName);
}
}
......
......@@ -628,6 +628,11 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
bnNotify();
}
if (!tsEnableBalance) {
int32_t numOfMnodes = mnodeGetMnodesNum();
if (numOfMnodes < tsNumOfMnodes) bnNotify();
}
if (openVnodes != pDnode->openVnodes) {
mnodeCheckUnCreatedVgroup(pDnode, pStatus->load, openVnodes);
}
......@@ -722,6 +727,10 @@ int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) {
SDnodeObj *pDnode = mnodeGetDnodeByEp(ep);
if (pDnode == NULL) {
if (strspn(ep, "0123456789 ;") != strlen(ep)) {
return TSDB_CODE_MND_DNODE_NOT_EXIST;
}
int32_t dnodeId = (int32_t)strtol(ep, NULL, 10);
pDnode = mnodeGetDnode(dnodeId);
if (pDnode == NULL) {
......
......@@ -381,6 +381,8 @@ static bool mnodeAllOnline() {
void *pIter = NULL;
bool allOnline = true;
sdbUpdateMnodeRoles();
while (1) {
SMnodeObj *pMnode = NULL;
pIter = mnodeGetNextMnode(pIter, &pMnode);
......
......@@ -78,6 +78,7 @@ extern "C" {
#include <error.h>
#include <math.h>
#include <dlfcn.h>
#include <poll.h>
#define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val)
......
......@@ -79,6 +79,7 @@ extern "C" {
#include <linux/sysctl.h>
#include <math.h>
#include <dlfcn.h>
#include <poll.h>
#ifdef __cplusplus
}
......
......@@ -78,6 +78,7 @@ extern "C" {
#include <error.h>
#include <math.h>
#include <dlfcn.h>
#include <poll.h>
#define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val)
......
......@@ -75,6 +75,7 @@ extern "C" {
#include <fcntl.h>
#include <sys/utsname.h>
#include <sys/resource.h>
#include <poll.h>
#ifndef _ALPINE
#include <error.h>
#endif
......
......@@ -68,6 +68,8 @@ void taosSetMaskSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t* optlen);
// TAOS_OS_FUNC_SOCKET_INET
uint32_t taosInetAddr(char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
......
......@@ -71,6 +71,9 @@ int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *op
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
}
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t* optlen) {
return getsockopt(socketfd, level, optname, optval, (socklen_t *)optlen);
}
#endif
#ifndef TAOS_OS_FUNC_SOCKET_INET
......
......@@ -87,7 +87,8 @@ typedef struct SResultRow {
bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current result row
STimeWindow win;
char* key; // start key of current result row
} SResultRow;
typedef struct SGroupResInfo {
......
......@@ -679,6 +679,7 @@ expr(A) ::= STRING(X). { A = tSqlExprCreateIdValue(&X, TK_STRING);}
expr(A) ::= NOW(X). { A = tSqlExprCreateIdValue(&X, TK_NOW); }
expr(A) ::= VARIABLE(X). { A = tSqlExprCreateIdValue(&X, TK_VARIABLE);}
expr(A) ::= BOOL(X). { A = tSqlExprCreateIdValue(&X, TK_BOOL);}
expr(A) ::= NULL(X). { A = tSqlExprCreateIdValue(&X, TK_NULL);}
// ordinary functions: min(x), max(x), top(k, 20)
expr(A) ::= ID(X) LP exprlist(Y) RP(E). { A = tSqlExprCreateFunction(Y, &X, &E, X.type); }
......
......@@ -194,6 +194,7 @@ static bool isPointInterpoQuery(SQuery *pQuery);
static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
int32_t groupIndex);
......@@ -1363,6 +1364,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
qError("QInfo:%"PRIu64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
......@@ -1383,6 +1385,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
memcpy(pInfo->prevData, val, bytes);
if (pQuery->stableQuery && pQuery->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
}
int32_t ret =
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
......@@ -1912,14 +1918,15 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosHashCleanup(pRuntimeEnv->pResultRowHashTable);
pRuntimeEnv->pResultRowHashTable = NULL;
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL;
destroyOperatorInfo(pRuntimeEnv->proot);
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
}
static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
......@@ -2677,6 +2684,21 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis);
if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) {
{ // set previous window
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
}
bool load = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pTableScanInfo->pCtx[i].functionId;
......@@ -3487,6 +3509,42 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfExprs = pQuery->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &(pExpr[i]);
if (pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (bytes == 0 || memcmp(p->tags, val, bytes) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->colId == pFuncMsg->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
}
/*
* There are two cases to handle:
*
......@@ -6672,6 +6730,9 @@ void freeQInfo(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -6707,7 +6768,6 @@ void freeQInfo(SQInfo *pQInfo) {
}
}
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
tfree(pQInfo->pBuf);
tfree(pQInfo->sql);
......
......@@ -127,7 +127,12 @@ tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType) {
pSqlExpr->token = *pToken;
}
if (optrType == TK_INTEGER || optrType == TK_STRING || optrType == TK_FLOAT || optrType == TK_BOOL) {
if (optrType == TK_NULL) {
pToken->type = TSDB_DATA_TYPE_NULL;
tVariantCreate(&pSqlExpr->value, pToken);
pSqlExpr->tokenId = optrType;
pSqlExpr->type = SQL_NODE_VALUE;
} else if (optrType == TK_INTEGER || optrType == TK_STRING || optrType == TK_FLOAT || optrType == TK_BOOL) {
toTSDBType(pToken->type);
tVariantCreate(&pSqlExpr->value, pToken);
......@@ -356,7 +361,11 @@ void tSqlExprCompact(tSqlExpr** pExpr) {
bool tSqlExprIsLeaf(tSqlExpr* pExpr) {
return (pExpr->pRight == NULL && pExpr->pLeft == NULL) &&
(pExpr->tokenId == 0 || pExpr->tokenId == TK_ID || (pExpr->tokenId >= TK_BOOL && pExpr->tokenId <= TK_NCHAR) || pExpr->tokenId == TK_SET);
(pExpr->tokenId == 0 ||
(pExpr->tokenId == TK_ID) ||
(pExpr->tokenId >= TK_BOOL && pExpr->tokenId <= TK_NCHAR) ||
(pExpr->tokenId == TK_NULL) ||
(pExpr->tokenId == TK_SET));
}
bool tSqlExprIsParentOfLeaf(tSqlExpr* pExpr) {
......
......@@ -564,7 +564,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenId) {
return 0;
}
SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgnoreToken, uint32_t* ignoreTokenTypes) {
SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
SStrToken t0 = {0};
// here we reach the end of sql string, null-terminated string
......@@ -589,7 +589,10 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
}
t0.n = tSQLGetToken(&str[*i], &t0.type);
break;
// not support user specfied ignored symbol list
#if 0
bool ignore = false;
for (uint32_t k = 0; k < numOfIgnoreToken; k++) {
if (t0.type == ignoreTokenTypes[k]) {
......@@ -601,6 +604,7 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
if (!ignore) {
break;
}
#endif
}
if (t0.type == TK_SEMI) {
......
......@@ -66,8 +66,8 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
return;
}
if (pResultRowInfo->type == TSDB_DATA_TYPE_BINARY || pResultRowInfo->type == TSDB_DATA_TYPE_NCHAR) {
for(int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pResultRowInfo->pResult[i]) {
tfree(pResultRowInfo->pResult[i]->key);
}
}
......@@ -153,11 +153,8 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
pResultRow->offset = -1;
pResultRow->closed = false;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
tfree(pResultRow->key);
} else {
pResultRow->win = TSWINDOW_INITIALIZER;
}
}
// TODO refactor: use macro
......
......@@ -375,6 +375,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:%"PRIu64" query killed", pQInfo->qId);
setQueryKilled(pQInfo);
// Wait for the query executing thread being stopped/
......
此差异已折叠。
......@@ -10,7 +10,7 @@ namespace {
// simple test
void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1);
int32_t pageId = 0;
int32_t groupId = 0;
......@@ -52,7 +52,7 @@ void simpleTest() {
void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0;
int32_t writePageId = 0;
......@@ -99,7 +99,7 @@ void writeDownTest() {
void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, NULL);
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0;
int32_t writePageId = 0;
......
......@@ -35,7 +35,7 @@ extern "C" {
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_MAX_FWDS 512
#define SYNC_MAX_FWDS 1024
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_CHECK_INTERVAL 1000 // ms
......@@ -117,7 +117,6 @@ typedef struct SSyncNode {
FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp;
FResetVersion resetVersionFp;
FSendFile sendFileFp;
FRecvFile recvFileFp;
pthread_mutex_t mutex;
......
......@@ -182,7 +182,6 @@ int64_t syncStart(const SSyncInfo *pInfo) {
pNode->startSyncFileFp = pInfo->startSyncFileFp;
pNode->stopSyncFileFp = pInfo->stopSyncFileFp;
pNode->getVersionFp = pInfo->getVersionFp;
pNode->resetVersionFp = pInfo->resetVersionFp;
pNode->sendFileFp = pInfo->sendFileFp;
pNode->recvFileFp = pInfo->recvFileFp;
......@@ -1373,7 +1372,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId) {
if (/*pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && */ nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue;
if (/*pPeer->sstatus > TAOS_SYNC_STATUS_INIT || */ nodeSStatus > TAOS_SYNC_STATUS_INIT) continue;
sDebug("%s, check roles since self:%s sstatus:%s, peer:%s sstatus:%s", pPeer->id, syncRole[pPeer->role],
sDebug("%s, check roles since peer:%s sstatus:%s, self:%s sstatus:%s", pPeer->id, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], syncRole[nodeRole], syncStatus[nodeSStatus]);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId());
break;
......@@ -1460,7 +1459,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if ((pNode->quorum > 1 || force) && code == 0) {
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
if (code >= 0) code = 1;
if (code >= 0) {
code = 1;
} else {
pthread_mutex_unlock(&pNode->mutex);
return code;
}
}
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
......
......@@ -238,7 +238,6 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
(*pNode->stopSyncFileFp)(pNode->vgId, fversion);
nodeVersion = fversion;
if (pNode->resetVersionFp) (*pNode->resetVersionFp)(pNode->vgId, fversion);
sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion);
uint64_t wver = 0;
......
......@@ -17,7 +17,7 @@
#define TSDB_MAX_SUBBLOCKS 8
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsMsPerDay[precision] / days + 1);
return (int)((key + 1) / tsMsPerDay[precision] / days - 1);
} else {
return (int)((key / tsMsPerDay[precision] / days));
}
......
......@@ -33,6 +33,7 @@ static int tsdbScanDataDir(STsdbRepo *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
static int tsdbRestoreCurrent(STsdbRepo *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo);
// ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
......@@ -246,6 +247,8 @@ int tsdbOpenFS(STsdbRepo *pRepo) {
tsdbError("vgId:%d failed to open FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
tsdbScanAndTryFixDFilesHeader(pRepo);
} else {
if (tsdbRestoreCurrent(pRepo) < 0) {
tsdbError("vgId:%d failed to restore current file since %s", REPO_ID(pRepo), tstrerror(terrno));
......@@ -1202,3 +1205,40 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
}
}
}
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
SDFInfo info;
for (size_t i = 0; i < taosArrayGetSize(pStatus->df); i++) {
SDFileSet fset;
tsdbInitDFileSetEx(&fset, (SDFileSet *)taosArrayGet(pStatus->df, i));
tsdbDebug("vgId:%d scan DFileSet %d header", REPO_ID(pRepo), fset.fid);
if (tsdbOpenDFileSet(&fset, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open DFileSet %d since %s, continue", REPO_ID(pRepo), fset.fid, tstrerror(terrno));
continue;
}
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);
if ((tsdbLoadDFileHeader(pDFile, &info) < 0) || pDFile->info.size != info.size ||
pDFile->info.magic != info.magic) {
if (tsdbUpdateDFileHeader(pDFile) < 0) {
tsdbError("vgId:%d failed to update DFile header of %s since %s, continue", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno));
} else {
tsdbInfo("vgId:%d DFile header of %s is updated", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile));
TSDB_FILE_FSYNC(pDFile);
}
} else {
tsdbDebug("vgId:%d DFile header of %s is correct", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile));
}
}
tsdbCloseDFileSet(&fset);
}
}
\ No newline at end of file
......@@ -567,6 +567,7 @@ void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t v
}
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
pSet->fid = pOSet->fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype));
}
......
......@@ -526,7 +526,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
tsdbGetTableSchemaImpl(pTable, false, false, -1), 0);
}
}
}
......
......@@ -840,7 +840,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
tsdbGetTableSchemaImpl(pTable, false, false, -1), 1);
}
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
......
......@@ -917,7 +917,9 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int
pCheckInfo->compSize = compIndex->len;
}
tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo));
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
return terrno;
}
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
......@@ -2832,7 +2834,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
}
int64_t stime = taosGetTimestampUs();
tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
if (tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock) < 0) {
return terrno;
}
int16_t* colIds = pHandle->defaultLoadColumn->pData;
......
......@@ -194,8 +194,8 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
return 0;
}
if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0 ||
TSDB_FILE_IS_BAD(pLMFile)) {
if (pLMFile == NULL || pSynch->pmf->info.size != pLMFile->info.size ||
pSynch->pmf->info.magic != pLMFile->info.magic || TSDB_FILE_IS_BAD(pLMFile)) {
// Local has no meta file or has a different meta file, need to copy from remote
pSynch->mfChanged = true;
......@@ -536,7 +536,7 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) {
SDFile *pDFile1 = TSDB_DFILE_IN_SET(pSet1, ftype);
SDFile *pDFile2 = TSDB_DFILE_IN_SET(pSet2, ftype);
if (memcmp((void *)(TSDB_FILE_INFO(pDFile1)), (void *)(TSDB_FILE_INFO(pDFile2)), sizeof(SDFInfo)) != 0) {
if (pDFile1->info.size != pDFile2->info.size || pDFile1->info.magic != pDFile2->info.magic) {
return false;
}
}
......
......@@ -51,11 +51,9 @@ uint32_t tSQLGetToken(char *z, uint32_t *tokenType);
* @param str
* @param i
* @param isPrevOptr
* @param numOfIgnoreToken
* @param ignoreTokenTypes
* @return
*/
SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr, uint32_t numOfIgnoreToken, uint32_t *ignoreTokenTypes);
SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
/**
* check if it is a keyword or not
......
......@@ -22,6 +22,8 @@
#define SIGPIPE EPIPE
#endif
#define TCP_CONN_TIMEOUT 3000 // conn timeout
int32_t taosGetFqdn(char *fqdn) {
char hostname[1024];
hostname[1023] = '\0';
......@@ -346,10 +348,47 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
serverAddr.sin_addr.s_addr = destIp;
serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);
#ifdef _TD_LINUX
taosSetNonblocking(sockFd, 1);
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret == -1) {
if (errno == EHOSTUNREACH) {
uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
return -1;
} else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
struct pollfd wfd[1];
wfd[0].fd = sockFd;
wfd[0].events = POLLOUT;
int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
if (res == -1 || res == 0) {
uError("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
int optVal = -1, optLen = sizeof(int);
if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
uError("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
ret = 0;
} else { // Other error
uError("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
}
taosSetNonblocking(sockFd, 0);
#else
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
#endif
if (ret != 0) {
// uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
sockFd = -1;
} else {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_BACKUP_H
#define TDENGINE_VNODE_BACKUP_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitBackup();
void vnodeCleanupBackup();
int32_t vnodeBackup(int32_t vgId);
#ifdef __cplusplus
}
#endif
#endif
......@@ -30,7 +30,6 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion);
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "tglobal.h"
#include "tfs.h"
#include "vnodeBackup.h"
#include "vnodeMain.h"
typedef struct {
int32_t vgId;
} SVBackupMsg;
typedef struct {
pthread_t thread;
int32_t workerId;
} SVBackupWorker;
typedef struct {
int32_t num;
SVBackupWorker *worker;
} SVBackupWorkerPool;
static SVBackupWorkerPool tsVBackupPool;
static taos_qset tsVBackupQset;
static taos_queue tsVBackupQueue;
static void vnodeProcessBackupMsg(SVBackupMsg *pMsg) {
int32_t vgId = pMsg->vgId;
char newDir[TSDB_FILENAME_LEN] = {0};
char stagingDir[TSDB_FILENAME_LEN] = {0};
sprintf(newDir, "%s/vnode%d", "vnode_bak", vgId);
sprintf(stagingDir, "%s/.staging/vnode%d", "vnode_bak", vgId);
if (tsEnableVnodeBak) {
tfsRmdir(newDir);
tfsRename(stagingDir, newDir);
} else {
vInfo("vgId:%d, vnode backup not enabled", vgId);
tfsRmdir(stagingDir);
}
}
static void *vnodeBackupFunc(void *param) {
while (1) {
SVBackupMsg *pMsg = NULL;
if (taosReadQitemFromQset(tsVBackupQset, NULL, (void **)&pMsg, NULL) == 0) {
vDebug("qset:%p, vbackup got no message from qset, exiting", tsVBackupQset);
break;
}
vTrace("vgId:%d, will be processed in vbackup queue", pMsg->vgId);
vnodeProcessBackupMsg(pMsg);
vTrace("vgId:%d, disposed in vbackup worker", pMsg->vgId);
taosFreeQitem(pMsg);
}
return NULL;
}
static int32_t vnodeStartBackup() {
tsVBackupQueue = taosOpenQueue();
if (tsVBackupQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
taosAddIntoQset(tsVBackupQset, tsVBackupQueue, NULL);
for (int32_t i = 0; i < tsVBackupPool.num; ++i) {
SVBackupWorker *pWorker = tsVBackupPool.worker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, vnodeBackupFunc, pWorker) != 0) {
vError("failed to create thread to process vbackup queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
vDebug("vbackup:%d is launched, total:%d", pWorker->workerId, tsVBackupPool.num);
}
vDebug("vbackup queue:%p is allocated", tsVBackupQueue);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeWriteIntoBackupWorker(int32_t vgId) {
SVBackupMsg *pMsg = taosAllocateQitem(sizeof(SVBackupMsg));
if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY;
pMsg->vgId = vgId;
int32_t code = taosWriteQitem(tsVBackupQueue, TAOS_QTYPE_RPC, pMsg);
if (code == 0) code = TSDB_CODE_DND_ACTION_IN_PROGRESS;
return code;
}
int32_t vnodeBackup(int32_t vgId) {
vTrace("vgId:%d, will backup", vgId);
return vnodeWriteIntoBackupWorker(vgId);
}
int32_t vnodeInitBackup() {
tsVBackupQset = taosOpenQset();
tsVBackupPool.num = 1;
tsVBackupPool.worker = calloc(sizeof(SVBackupWorker), tsVBackupPool.num);
if (tsVBackupPool.worker == NULL) return -1;
for (int32_t i = 0; i < tsVBackupPool.num; ++i) {
SVBackupWorker *pWorker = tsVBackupPool.worker + i;
pWorker->workerId = i;
vDebug("vbackup:%d is created", i);
}
vDebug("vbackup is initialized, num:%d qset:%p", tsVBackupPool.num, tsVBackupQset);
return vnodeStartBackup();
}
void vnodeCleanupBackup() {
for (int32_t i = 0; i < tsVBackupPool.num; ++i) {
SVBackupWorker *pWorker = tsVBackupPool.worker + i;
if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsVBackupQset);
}
vDebug("vbackup:%d is closed", i);
}
for (int32_t i = 0; i < tsVBackupPool.num; ++i) {
SVBackupWorker *pWorker = tsVBackupPool.worker + i;
vDebug("vbackup:%d start to join", i);
if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL);
}
vDebug("vbackup:%d join success", i);
}
vDebug("vbackup is closed, qset:%p", tsVBackupQset);
taosCloseQset(tsVBackupQset);
tsVBackupQset = NULL;
tfree(tsVBackupPool.worker);
vDebug("vbackup queue:%p is freed", tsVBackupQueue);
taosCloseQueue(tsVBackupQueue);
tsVBackupQueue = NULL;
}
......@@ -27,6 +27,7 @@
#include "vnodeVersion.h"
#include "vnodeMgmt.h"
#include "vnodeWorker.h"
#include "vnodeBackup.h"
#include "vnodeMain.h"
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
......@@ -364,7 +365,6 @@ int32_t vnodeOpen(int32_t vgId) {
syncInfo.startSyncFileFp = vnodeStartSyncFile;
syncInfo.stopSyncFileFp = vnodeStopSyncFile;
syncInfo.getVersionFp = vnodeGetVersion;
syncInfo.resetVersionFp = vnodeResetVersion;
syncInfo.sendFileFp = tsdbSyncSend;
syncInfo.recvFileFp = tsdbSyncRecv;
syncInfo.pTsdb = pVnode->tsdb;
......@@ -448,18 +448,14 @@ void vnodeDestroy(SVnodeObj *pVnode) {
if (pVnode->dropped) {
char rootDir[TSDB_FILENAME_LEN] = {0};
char newDir[TSDB_FILENAME_LEN] = {0};
char stagingDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", "vnode", vgId);
sprintf(newDir, "%s/vnode%d", "vnode_bak", vgId);
sprintf(stagingDir, "%s/.staging/vnode%d", "vnode_bak", vgId);
if (0 == tsEnableVnodeBak) {
vInfo("vgId:%d, vnode backup not enabled", pVnode->vgId);
} else {
tfsRmdir(newDir);
tfsRename(rootDir, newDir);
}
tfsRename(rootDir, stagingDir);
vnodeBackup(vgId);
tfsRmdir(rootDir);
dnodeSendStatusMsgToMnode();
}
......
......@@ -17,6 +17,7 @@
#include "os.h"
#include "dnode.h"
#include "vnodeStatus.h"
#include "vnodeBackup.h"
#include "vnodeWorker.h"
#include "vnodeRead.h"
#include "vnodeWrite.h"
......@@ -29,6 +30,7 @@ static void vnodeCleanupHash(void);
static void vnodeIncRef(void *ptNode);
static SStep tsVnodeSteps[] = {
{"vnode-backup", vnodeInitBackup, vnodeCleanupBackup},
{"vnode-worker", vnodeInitMWorker, vnodeCleanupMWorker},
{"vnode-write", vnodeInitWrite, vnodeCleanupWrite},
{"vnode-read", vnodeInitRead, vnodeCleanupRead},
......
......@@ -364,7 +364,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, conn:%p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -409,7 +409,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int32_t vgId) {
SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
pMsg->qhandle = htobe64((uint64_t)qhandle);
pMsg->qId = htobe64(qId);
pMsg->header.vgId = htonl(vgId);
pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
......
......@@ -107,8 +107,9 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) {
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
walResetVersion(pVnode->wal, fversion);
vDebug("vgId:%d, datafile is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
vInfo("vgId:%d, datafile is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
vnodeSetReadyStatus(pVnode);
vnodeRelease(pVnode);
......@@ -158,22 +159,6 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
return code;
}
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while reset version", vgId);
return -1;
}
pVnode->fversion = fver;
pVnode->version = fver;
walResetVersion(pVnode->wal, fver);
vInfo("vgId:%d, version reset to %" PRIu64, vgId, fver);
vnodeRelease(pVnode);
return 0;
}
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) {
SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code, force);
......
......@@ -91,12 +91,17 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
int32_t syncCode = 0;
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
if (syncCode < 0) return syncCode;
if (syncCode < 0) {
pHead->version = 0;
return syncCode;
}
// write into WAL
code = walWrite(pVnode->wal, pHead);
if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
pHead->version = 0;
return code;
}
......@@ -104,7 +109,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
if (code < 0) return code;
if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
return code;
}
return syncCode;
}
......
......@@ -68,12 +68,12 @@ public class JDBCDemo {
}
private void insert() {
final String sql = "insert into test.weather (ts, temperature, humidity) values(now, 20.5, 34)";
final String sql = "insert into " + dbName + "." + tbName + " (ts, temperature, humidity) values(now, 20.5, 34)";
exuete(sql);
}
private void select() {
final String sql = "select * from test.weather";
final String sql = "select * from "+ dbName + "." + tbName;
executeQuery(sql);
}
......
......@@ -16,24 +16,26 @@ package main
import (
"database/sql"
"flag"
"fmt"
_ "github.com/taosdata/driver-go/taosSql"
"math/rand"
"os"
"sync"
"runtime"
"strconv"
"sync"
"time"
"flag"
"math/rand"
_ "github.com/taosdata/driver-go/taosSql"
//"golang.org/x/sys/unix"
)
const (
maxLocationSize = 32
maxSqlBufSize = 65480
//maxSqlBufSize = 65480
)
var locations = [maxLocationSize]string {
var locations = [maxLocationSize]string{
"Beijing", "Shanghai", "Guangzhou", "Shenzhen",
"HangZhou", "Tianjin", "Wuhan", "Changsha",
"Nanjing", "Xian"}
......@@ -62,7 +64,7 @@ var taosDriverName = "taosSql"
var url string
func init() {
flag.StringVar(&configPara.hostName, "h", "127.0.0.1","The host to connect to TDengine server.")
flag.StringVar(&configPara.hostName, "h", "127.0.0.1", "The host to connect to TDengine server.")
flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
......@@ -80,7 +82,7 @@ func init() {
configPara.supTblName = "meters"
startTs, err := time.ParseInLocation("2006-01-02 15:04:05", configPara.startTimestamp, time.Local)
if err==nil {
if err == nil {
configPara.startTs = startTs.UnixNano() / 1e6
}
}
......@@ -104,7 +106,7 @@ func printAllArgs() {
func main() {
printAllArgs()
fmt.Printf("Please press enter key to continue....\n")
fmt.Scanln()
_, _ = fmt.Scanln()
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
......@@ -138,7 +140,7 @@ func main() {
func createDatabase(dbName string, supTblName string) {
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
fmt.Printf("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
......@@ -165,27 +167,27 @@ func createDatabase(dbName string, supTblName string) {
checkErr(err, sqlStr)
}
func multiThreadCreateTable(threads int, ntables int, dbName string, tablePrefix string) {
func multiThreadCreateTable(threads int, nTables int, dbName string, tablePrefix string) {
st := time.Now().UnixNano()
if (threads < 1) {
threads = 1;
if threads < 1 {
threads = 1
}
a := ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
a := nTables / threads
if a < 1 {
threads = nTables
a = 1
}
b := ntables % threads;
b := nTables % threads
last := 0;
last := 0
endTblId := 0
wg := sync.WaitGroup{}
for i := 0; i < threads; i++ {
startTblId := last
if (i < b ) {
if i < b {
endTblId = last + a
} else {
endTblId = last + a - 1
......@@ -206,7 +208,7 @@ func createTable(dbName string, childTblPrefix string, startTblId int, endTblId
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
fmt.Printf("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
......@@ -228,20 +230,21 @@ func generateRowData(ts int64) string {
values := "( " + strconv.FormatInt(ts, 10) + ", " + strconv.FormatFloat(float64(current), 'f', 6, 64) + ", " + strconv.Itoa(voltage) + ", " + strconv.FormatFloat(float64(phase), 'f', 6, 64) + " ) "
return values
}
func insertData(dbName string, childTblPrefix string, startTblId int, endTblId int, wg *sync.WaitGroup) {
//fmt.Printf("subThread[%d]: insert data to table from %d to %d \n", unix.Gettid(), startTblId, endTblId)
// windows.GetCurrentThreadId()
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
fmt.Printf("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
tmpTs := configPara.startTs;
tmpTs := configPara.startTs
//rand.New(rand.NewSource(time.Now().UnixNano()))
for tID := startTblId; tID <= endTblId; tID++{
for tID := startTblId; tID <= endTblId; tID++ {
totalNum := 0
for {
sqlStr := "insert into " + dbName + "." + childTblPrefix + strconv.Itoa(tID) + " values "
......@@ -254,7 +257,7 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
sqlStr = fmt.Sprintf("%s %s", sqlStr, valuesOfRow)
if (currRowNum >= configPara.numOfRecordsPerReq || totalNum >= configPara.numOfRecordsPerTable) {
if currRowNum >= configPara.numOfRecordsPerReq || totalNum >= configPara.numOfRecordsPerTable {
break
}
}
......@@ -265,12 +268,12 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
count, err := res.RowsAffected()
checkErr(err, "rows affected")
if (count != int64(currRowNum)) {
if count != int64(currRowNum) {
fmt.Printf("insert data, expect affected:%d, actual:%d\n", currRowNum, count)
os.Exit(1)
}
if (totalNum >= configPara.numOfRecordsPerTable) {
if totalNum >= configPara.numOfRecordsPerTable {
break
}
}
......@@ -279,44 +282,46 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
wg.Done()
runtime.Goexit()
}
func multiThreadInsertData(threads int, ntables int, dbName string, tablePrefix string) {
func multiThreadInsertData(threads int, nTables int, dbName string, tablePrefix string) {
st := time.Now().UnixNano()
if (threads < 1) {
threads = 1;
if threads < 1 {
threads = 1
}
a := ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
a := nTables / threads
if a < 1 {
threads = nTables
a = 1
}
b := ntables % threads;
b := nTables % threads
last := 0;
last := 0
endTblId := 0
wg := sync.WaitGroup{}
for i := 0; i < threads; i++ {
startTblId := last
if (i < b ) {
if i < b {
endTblId = last + a
} else {
endTblId = last + a - 1
}
last = endTblId + 1
wg.Add(1)
go insertData(dbName, tablePrefix, startTblId , endTblId, &wg)
go insertData(dbName, tablePrefix, startTblId, endTblId, &wg)
}
wg.Wait()
et := time.Now().UnixNano()
fmt.Printf("insert data spent duration: %6.6fs\n", (float32(et-st))/1e9)
}
func selectTest(dbName string, tbPrefix string, supTblName string){
func selectTest(dbName string, tbPrefix string, supTblName string) {
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
fmt.Printf("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
......@@ -352,7 +357,7 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
}
// select sql 2
sqlStr = "select avg(voltage), min(voltage), max(voltage) from " + dbName + "." + tbPrefix + strconv.Itoa( rand.Int() % configPara.numOftables)
sqlStr = "select avg(voltage), min(voltage), max(voltage) from " + dbName + "." + tbPrefix + strconv.Itoa(rand.Int()%configPara.numOftables)
rows, err = db.Query(sqlStr)
checkErr(err, sqlStr)
......
......@@ -50,7 +50,7 @@ function buildTDengine {
echo "repo up-to-date"
else
echo "repo need to pull"
git pull > /dev/null
git pull > /dev/null 2>&1
LOCAL_COMMIT=`git rev-parse --short @`
cd debug
......@@ -72,26 +72,13 @@ function runQueryPerfTest {
python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
yes | taosdemo -c /etc/taosperf/ -d taosdemo_insert_test -x > taosdemoperf.txt
CREATETABLETIME=`grep 'Spent' taosdemoperf.txt | awk 'NR==1{print $2}'`
INSERTRECORDSTIME=`grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $2}'`
REQUESTSPERSECOND=`grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $13}'`
delay=`grep 'delay' taosdemoperf.txt | awk '{print $4}'`
AVGDELAY=`echo ${delay:0:${#delay}-3}`
delay=`grep 'delay' taosdemoperf.txt | awk '{print $6}'`
MAXDELAY=`echo ${delay:0:${#delay}-3}`
delay=`grep 'delay' taosdemoperf.txt | awk '{print $8}'`
MINDELAY=`echo ${delay:0:${#delay}-2}`
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT -t $CREATETABLETIME -i $INSERTRECORDSTIME -r $REQUESTSPERSECOND -avg $AVGDELAY -max $MAXDELAY -min $MINDELAY | tee -a $PERFORMANCE_TEST_REPORT
[ -f taosdemoperf.txt ] && rm taosdemoperf.txt
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
}
function sendReport {
echo "send report"
receiver="pxiao@taosdata.com"
receiver="develop@taosdata.com"
mimebody="MIME-Version: 1.0\nContent-Type: text/html; charset=utf-8\n"
cd $TDENGINE_DIR
......
......@@ -139,6 +139,18 @@ python3 ./test.py -f import_merge/importInsertThenImport.py
python3 ./test.py -f import_merge/importCSV.py
#======================p1-end===============
#======================p2-start===============
# tools
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestWithJson.py
python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdemoTestTblAlt.py
python3 test.py -f tools/taosdemoTestSampleData.py
python3 test.py -f tools/taosdemoTestInterlace.py
python3 test.py -f tools/taosdemoTestQuery.py
# update
python3 ./test.py -f update/allow_update.py
python3 ./test.py -f update/allow_update-0.py
......@@ -166,6 +178,7 @@ python3 ./test.py -f stable/query_after_reset.py
# perfbenchmark
python3 ./test.py -f perfbenchmark/bug3433.py
#python3 ./test.py -f perfbenchmark/bug3589.py
#query
......@@ -200,6 +213,8 @@ python3 ./test.py -f query/bug2119.py
python3 ./test.py -f query/isNullTest.py
python3 ./test.py -f query/queryWithTaosdKilled.py
python3 ./test.py -f query/floatCompare.py
python3 ./test.py -f query/query1970YearsAf.py
python3 ./test.py -f query/bug3351.py
python3 ./test.py -f query/bug3375.py
......@@ -246,18 +261,6 @@ python3 test.py -f subscribe/supertable.py
#======================p3-end===============
#======================p4-start===============
# tools
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestWithJson.py
python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdemoTest2.py
python3 test.py -f tools/taosdemoTestSampleData.py
python3 test.py -f tools/taosdemoTestInterlace.py
python3 test.py -f tools/taosdemoTestQuery.py
python3 ./test.py -f update/merge_commit_data-0.py
# wal
python3 ./test.py -f wal/addOldWalTest.py
......
此差异已折叠。
此差异已折叠。
......@@ -15,7 +15,7 @@
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"drop": "no",
"replica": 1,
"days": 10,
"cache": 16,
......@@ -33,7 +33,7 @@
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"child_table_exists":"yes",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
......
......@@ -15,7 +15,7 @@
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"drop": "no",
"replica": 1,
"days": 10,
"cache": 16,
......@@ -33,7 +33,7 @@
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"child_table_exists":"yes",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
......
#!/bin/bash
Cur_Dir=$(pwd)
echo $Cur_Dir
echo "'2020-1-1 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册