提交 d0d044e2 编写于 作者: haoranc's avatar haoranc

Merge branch 'develop' of github.com:taosdata/TDengine into dev/chr

...@@ -715,7 +715,7 @@ Query OK, 1 row(s) in set (0.001091s) ...@@ -715,7 +715,7 @@ Query OK, 1 row(s) in set (0.001091s)
2. 同时进行多个字段的范围过滤,需要使用关键词 AND 来连接不同的查询条件,暂不支持 OR 连接的不同列之间的查询过滤条件。 2. 同时进行多个字段的范围过滤,需要使用关键词 AND 来连接不同的查询条件,暂不支持 OR 连接的不同列之间的查询过滤条件。
3. 针对单一字段的过滤,如果是时间过滤条件,则一条语句中只支持设定一个;但针对其他的(普通)列或标签列,则可以使用 `OR` 关键字进行组合条件的查询过滤。例如: `((value > 20 AND value < 30) OR (value < 12))`。 3. 针对单一字段的过滤,如果是时间过滤条件,则一条语句中只支持设定一个;但针对其他的(普通)列或标签列,则可以使用 `OR` 关键字进行组合条件的查询过滤。例如: `((value > 20 AND value < 30) OR (value < 12))`。
4. 从 2.0.17.0 版本开始,条件过滤开始支持 BETWEEN AND 语法,例如 `WHERE col2 BETWEEN 1.5 AND 3.25` 表示查询条件为“1.5 ≤ col2 ≤ 3.25”。 4. 从 2.0.17.0 版本开始,条件过滤开始支持 BETWEEN AND 语法,例如 `WHERE col2 BETWEEN 1.5 AND 3.25` 表示查询条件为“1.5 ≤ col2 ≤ 3.25”。
5. 从 2.1.4.0 版本开始,条件过滤开始支持 IN 算子,例如 `WHERE city IN ('Beijing', 'Shanghai')`。说明:BOOL 类型写作 `{true, false}` 或 `{0, 1}` 均可,但不能写作 0、1 之外的整数;FLOAT 和 DOUBLE 类型会受到浮点数精度影响,集合内的值在精度范围内认为和数据行的值完全相等才能匹配成功。<!-- REPLACE_OPEN_TO_ENTERPRISE__IN_OPERATOR_AND_UNSIGNED_INTEGER --> 5. 从 2.1.4.0 版本开始,条件过滤开始支持 IN 算子,例如 `WHERE city IN ('Beijing', 'Shanghai')`。说明:BOOL 类型写作 `{true, false}` 或 `{0, 1}` 均可,但不能写作 0、1 之外的整数;FLOAT 和 DOUBLE 类型会受到浮点数精度影响,集合内的值在精度范围内认为和数据行的值完全相等才能匹配成功;TIMESTAMP 类型支持非主键的列。<!-- REPLACE_OPEN_TO_ENTERPRISE__IN_OPERATOR_AND_UNSIGNED_INTEGER -->
<!-- <!--
<a class="anchor" id="having"></a> <a class="anchor" id="having"></a>
...@@ -1338,6 +1338,7 @@ SELECT function_list FROM stb_name ...@@ -1338,6 +1338,7 @@ SELECT function_list FROM stb_name
- 查询过滤、聚合等操作按照每个切分窗口为独立的单位执行。聚合查询目前支持三种窗口的划分方式: - 查询过滤、聚合等操作按照每个切分窗口为独立的单位执行。聚合查询目前支持三种窗口的划分方式:
1. 时间窗口:聚合时间段的窗口宽度由关键词 INTERVAL 指定,最短时间间隔 10 毫秒(10a);并且支持偏移 offset(偏移必须小于间隔),也即时间窗口划分与“UTC 时刻 0”相比的偏移量。SLIDING 语句用于指定聚合时间段的前向增量,也即每次窗口向前滑动的时长。当 SLIDING 与 INTERVAL 取值相等的时候,滑动窗口即为翻转窗口。 1. 时间窗口:聚合时间段的窗口宽度由关键词 INTERVAL 指定,最短时间间隔 10 毫秒(10a);并且支持偏移 offset(偏移必须小于间隔),也即时间窗口划分与“UTC 时刻 0”相比的偏移量。SLIDING 语句用于指定聚合时间段的前向增量,也即每次窗口向前滑动的时长。当 SLIDING 与 INTERVAL 取值相等的时候,滑动窗口即为翻转窗口。
* 从 2.1.5.0 版本开始,INTERVAL 语句允许的最短时间间隔调整为 1 微秒(1u),当然如果所查询的 DATABASE 的时间精度设置为毫秒级,那么允许的最短时间间隔为 1 毫秒(1a)。 * 从 2.1.5.0 版本开始,INTERVAL 语句允许的最短时间间隔调整为 1 微秒(1u),当然如果所查询的 DATABASE 的时间精度设置为毫秒级,那么允许的最短时间间隔为 1 毫秒(1a)。
* **注意:**用到 INTERVAL 语句时,除非极特殊的情况,都要求把客户端和服务端的 taos.cfg 配置文件中的 timezone 参数配置为相同的取值,以避免时间处理函数频繁进行跨时区转换而导致的严重性能影响。
2. 状态窗口:使用整数或布尔值来标识产生记录时设备的状态量,产生的记录如果具有相同的状态量取值则归属于同一个状态窗口,数值改变后该窗口关闭。状态量所对应的列作为 STATE_WINDOW 语句的参数来指定。 2. 状态窗口:使用整数或布尔值来标识产生记录时设备的状态量,产生的记录如果具有相同的状态量取值则归属于同一个状态窗口,数值改变后该窗口关闭。状态量所对应的列作为 STATE_WINDOW 语句的参数来指定。
3. 会话窗口:时间戳所在的列由 SESSION 语句的 ts_col 参数指定,会话窗口根据相邻两条记录的时间戳差值来确定是否属于同一个会话——如果时间戳差异在 tol_val 以内,则认为记录仍属于同一个窗口;如果时间变化超过 tol_val,则自动开启下一个窗口。 3. 会话窗口:时间戳所在的列由 SESSION 语句的 ts_col 参数指定,会话窗口根据相邻两条记录的时间戳差值来确定是否属于同一个会话——如果时间戳差异在 tol_val 以内,则认为记录仍属于同一个窗口;如果时间变化超过 tol_val,则自动开启下一个窗口。
- WHERE 语句可以指定查询的起止时间和其他过滤条件。 - WHERE 语句可以指定查询的起止时间和其他过滤条件。
......
...@@ -1132,7 +1132,7 @@ TDengine supports aggregations over data, they are listed below: ...@@ -1132,7 +1132,7 @@ TDengine supports aggregations over data, they are listed below:
``` ```
Function: Return the difference between the max value and the min value of a column in statistics /STable. Function: Return the difference between the max value and the min value of a column in statistics /STable.
Return Data Type: Same as applicable fields. Return Data Type: Double.
Applicable Fields: All types except binary, nchar, bool. Applicable Fields: All types except binary, nchar, bool.
......
...@@ -59,7 +59,7 @@ pkg_name=${install_dir}-${osType}-${cpuType} ...@@ -59,7 +59,7 @@ pkg_name=${install_dir}-${osType}-${cpuType}
# exit 1 # exit 1
# fi # fi
if [ "$verType" == "beta" ]; then if [[ "$verType" == "beta" ]] || [[ "$verType" == "preRelease" ]]; then
pkg_name=${install_dir}-${verType}-${osType}-${cpuType} pkg_name=${install_dir}-${verType}-${osType}-${cpuType}
elif [ "$verType" == "stable" ]; then elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name} pkg_name=${pkg_name}
......
...@@ -182,7 +182,7 @@ pkg_name=${install_dir}-${osType}-${cpuType} ...@@ -182,7 +182,7 @@ pkg_name=${install_dir}-${osType}-${cpuType}
# exit 1 # exit 1
# fi # fi
if [ "$verType" == "beta" ]; then if [[ "$verType" == "beta" ]] || [[ "$verType" == "preRelease" ]]; then
pkg_name=${install_dir}-${verType}-${osType}-${cpuType} pkg_name=${install_dir}-${verType}-${osType}-${cpuType}
elif [ "$verType" == "stable" ]; then elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name} pkg_name=${pkg_name}
......
...@@ -215,7 +215,7 @@ pkg_name=${install_dir}-${osType}-${cpuType} ...@@ -215,7 +215,7 @@ pkg_name=${install_dir}-${osType}-${cpuType}
# exit 1 # exit 1
# fi # fi
if [ "$verType" == "beta" ]; then if [[ "$verType" == "beta" ]] || [[ "$verType" == "preRelease" ]]; then
pkg_name=${install_dir}-${verType}-${osType}-${cpuType} pkg_name=${install_dir}-${verType}-${osType}-${cpuType}
elif [ "$verType" == "stable" ]; then elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name} pkg_name=${pkg_name}
......
...@@ -273,7 +273,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo); ...@@ -273,7 +273,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo); int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists, bool onlyLocal);
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo); int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo);
void tscResetForNextRetrieve(SSqlRes* pRes); void tscResetForNextRetrieve(SSqlRes* pRes);
......
...@@ -1482,7 +1482,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -1482,7 +1482,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
} }
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true); code = tscGetTableMetaEx(pSql, pTableMetaInfo, true, false);
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) { if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
return code; return code;
} }
...@@ -1493,7 +1493,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -1493,7 +1493,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
} }
sql = sToken.z; sql = sToken.z;
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false); code = tscGetTableMetaEx(pSql, pTableMetaInfo, false, false);
if (pInsertParam->sql == NULL) { if (pInsertParam->sql == NULL) {
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS); assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
} }
......
...@@ -474,7 +474,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { ...@@ -474,7 +474,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
return code; return code;
} }
static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) { static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
int32_t code = 0; int32_t code = 0;
size_t numStable = taosArrayGetSize(stableSchemas); size_t numStable = taosArrayGetSize(stableSchemas);
for (int i = 0; i < numStable; ++i) { for (int i = 0; i < numStable; ++i) {
...@@ -570,6 +570,40 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa ...@@ -570,6 +570,40 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
return 0; return 0;
} }
static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind) {
char sql[512];
sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName);
int32_t code;
TAOS_STMT* stmt = taos_stmt_init(taos);
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_bind_param(stmt, bind);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_execute(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
code = taos_stmt_close(stmt);
if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt));
return code;
}
return code;
}
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) { static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) {
size_t numTags = taosArrayGetSize(tagsSchema); size_t numTags = taosArrayGetSize(tagsSchema);
char* sql = malloc(tsMaxSQLStringLen+1); char* sql = malloc(tsMaxSQLStringLen+1);
...@@ -657,7 +691,6 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols ...@@ -657,7 +691,6 @@ static int32_t insertChildTableBatch(TAOS* taos, char* cTableName, SArray* cols
} }
do { do {
code = taos_stmt_set_tbname(stmt, cTableName); code = taos_stmt_set_tbname(stmt, cTableName);
if (code != 0) { if (code != 0) {
tscError("%s", taos_stmt_errstr(stmt)); tscError("%s", taos_stmt_errstr(stmt));
...@@ -742,97 +775,200 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu ...@@ -742,97 +775,200 @@ static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int nu
return 0; return 0;
} }
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) { static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName,
int32_t code = TSDB_CODE_SUCCESS; SSmlSTableSchema* sTableSchema, SArray* cTablePoints) {
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t rows = taosArrayGetSize(cTablePoints);
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
true, false); for (int i= 0; i < rows; ++i) {
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas); TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
int32_t notNullTagsIndices[TSDB_MAX_TAGS] = {0};
int32_t numNotNullTags = 0;
for (int32_t i = 0; i < numTags; ++i) {
if (tagKVs[i] != NULL) {
notNullTagsIndices[numNotNullTags] = i;
++numNotNullTags;
}
}
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
taosArraySetSize(tagBinds, numTags);
int isNullColBind = TSDB_TRUE; int isNullColBind = TSDB_TRUE;
SArray** pCTablePoints = taosHashIterate(cname2points, NULL); for (int j = 0; j < numTags; ++j) {
while (pCTablePoints) { TAOS_BIND* bind = taosArrayGet(tagBinds, j);
SArray* cTablePoints = *pCTablePoints; bind->is_null = &isNullColBind;
}
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
// select tag1,tag2,... from stable where tbname in (ctable)
char* sql = malloc(tsMaxSQLStringLen+1);
int freeBytes = tsMaxSQLStringLen + 1;
snprintf(sql, freeBytes, "select tbname, ");
for (int i = 0; i < numNotNullTags ; ++i) {
snprintf(sql + strlen(sql), freeBytes-strlen(sql), "%s,", tagKVs[notNullTagsIndices[i]]->key);
}
snprintf(sql + strlen(sql) - 1, freeBytes - strlen(sql) + 1,
" from %s where tbname in (\'%s\')", sTableName, cTableName);
sql[strlen(sql)] = '\0';
TAOS_RES* result = taos_query(taos, sql);
free(sql);
int32_t code = taos_errno(result);
if (code != 0) {
tscError("get child table %s tags failed. error string %s", cTableName, taos_errstr(result));
goto cleanup;
}
// check tag value and set tag values if different
TAOS_ROW row = taos_fetch_row(result);
if (row != NULL) {
int numFields = taos_field_count(result);
TAOS_FIELD* fields = taos_fetch_fields(result);
int* lengths = taos_fetch_lengths(result);
for (int i = 1; i < numFields; ++i) {
uint8_t dbType = fields[i].type;
int32_t length = lengths[i];
char* val = row[i];
TAOS_SML_KV* tagKV = tagKVs[notNullTagsIndices[i-1]];
if (tagKV->type != dbType) {
tscError("child table %s tag %s type mismatch. point type : %d, db type : %d",
cTableName, tagKV->key, tagKV->type, dbType);
return TSDB_CODE_TSC_INVALID_VALUE;
}
assert(tagKV->value);
TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0); if (val == NULL || length != tagKV->length || memcmp(tagKV->value, val, length) != 0) {
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx); TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx);
size_t numTags = taosArrayGetSize(sTableSchema->tags); code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind);
size_t numCols = taosArrayGetSize(sTableSchema->fields); if (code != 0) {
tscError("change child table tag failed. table name %s, tag %s", cTableName, tagKV->key);
goto cleanup;
}
}
}
tscDebug("successfully applied point tags. child table: %s", cTableName);
} else {
code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds);
if (code != 0) {
goto cleanup;
}
}
cleanup:
taos_free_result(result);
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
free(bind->length);
}
taosArrayDestroy(tagBinds);
return code;
}
static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, SArray* cTablePoints) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND)); size_t numCols = taosArrayGetSize(sTableSchema->fields);
taosArraySetSize(tagBinds, numTags); size_t rows = taosArrayGetSize(cTablePoints);
for (int j = 0; j < numTags; ++j) { SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
TAOS_BIND* bind = taosArrayGet(tagBinds, j);
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", rows, numCols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int isNullColBind = TSDB_TRUE;
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind; bind->is_null = &isNullColBind;
} }
for (int j = 0; j < point->tagNum; ++j) { for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->tags + j; TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx); TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type; bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*)); bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length; *bind->length = kv->length;
bind->buffer = kv->value; bind->buffer = kv->value;
bind->is_null = NULL; bind->is_null = NULL;
} }
taosArrayPush(rowsBind, &colBinds);
}
size_t rows = taosArrayGetSize(cTablePoints); code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind);
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES); if (code != 0) {
tscError("insert into child table %s failed. error %s", cTableName, tstrerror(code));
for (int i = 0; i < rows; ++i) { }
point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND)); for (int i = 0; i < rows; ++i) {
if (colBinds == NULL) { TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, " for (int j = 0; j < numCols; ++j) {
"num of rows: %zu, num of cols: %zu", rows, numCols); TAOS_BIND* bind = colBinds + j;
} free(bind->length);
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind;
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = malloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
taosArrayPush(rowsBind, &colBinds);
} }
free(colBinds);
}
taosArrayDestroy(rowsBind);
return code;
}
code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds); static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
if (code == 0) { int32_t code = TSDB_CODE_SUCCESS;
code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind);
if (code != 0) {
tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code));
}
} else {
tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code));
}
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) { SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
TAOS_BIND* bind = taosArrayGet(tagBinds, i); arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);
free(bind->length);
} SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
taosArrayDestroy(tagBinds); while (pCTablePoints) {
for (int i = 0; i < rows; ++i) { SArray* cTablePoints = *pCTablePoints;
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
for (int j = 0; j < numCols; ++j) { TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
TAOS_BIND* bind = colBinds + j; SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
free(bind->length); code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints);
} if (code != 0) {
free(colBinds); tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code));
goto cleanup;
} }
taosArrayDestroy(rowsBind); code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints);
taosArrayDestroy(cTablePoints);
if (code != 0) { if (code != 0) {
break; tscError("Apply child table fields failed. child table %s, error %s", point->childTableName, tstrerror(code));
goto cleanup;
} }
tscDebug("successfully applied data points of child table %s", point->childTableName);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints); pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
} }
cleanup:
pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* pPoints = *pCTablePoints;
taosArrayDestroy(pPoints);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
taosHashCleanup(cname2points); taosHashCleanup(cname2points);
return code; return code;
} }
...@@ -849,15 +985,15 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) { ...@@ -849,15 +985,15 @@ int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
goto clean_up; goto clean_up;
} }
code = reconcileDBSchemas(taos, stableSchemas); code = modifyDBSchemas(taos, stableSchemas);
if (code != 0) { if (code != 0) {
tscError("error change db schema : %s", tstrerror(code)); tscError("error change db schema : %s", tstrerror(code));
goto clean_up; goto clean_up;
} }
code = insertPoints(taos, points, numPoint, stableSchemas); code = applyDataPoints(taos, points, numPoint, stableSchemas);
if (code != 0) { if (code != 0) {
tscError("error insert points : %s", tstrerror(code)); tscError("error apply data points : %s", tstrerror(code));
} }
clean_up: clean_up:
...@@ -1823,6 +1959,8 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines) { ...@@ -1823,6 +1959,8 @@ int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
cleanup: cleanup:
tscDebug("taos_insert_lines finish inserting %d lines. code: %d", numLines, code); tscDebug("taos_insert_lines finish inserting %d lines. code: %d", numLines, code);
points = TARRAY_GET_START(lpPoints);
numPoints = taosArrayGetSize(lpPoints);
for (int i=0; i<numPoints; ++i) { for (int i=0; i<numPoints; ++i) {
destroySmlDataPoint(points+i); destroySmlDataPoint(points+i);
} }
......
...@@ -1640,7 +1640,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1640,7 +1640,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname)); memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname));
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMetaEx(pSql, pTableMetaInfo, false, true);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
STMT_RET(code); STMT_RET(code);
} }
......
...@@ -6020,7 +6020,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -6020,7 +6020,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
int16_t i; int16_t i;
uint32_t nLen = 0; uint32_t nLen = 0;
for (i = 0; i < numOfColumns; ++i) { for (i = 0; i < numOfColumns; ++i) {
nLen += pSchema[i].colId != columnIndex.columnIndex ? pSchema[i].bytes : pItem->bytes; nLen += (i != columnIndex.columnIndex) ? pSchema[i].bytes : pItem->bytes;
} }
if (nLen >= TSDB_MAX_BYTES_PER_ROW) { if (nLen >= TSDB_MAX_BYTES_PER_ROW) {
return invalidOperationMsg(pMsg, msg24); return invalidOperationMsg(pMsg, msg24);
...@@ -6066,14 +6066,14 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -6066,14 +6066,14 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidOperationMsg(pMsg, msg22); return invalidOperationMsg(pMsg, msg22);
} }
SSchema* pSchema = (SSchema*) pTableMetaInfo->pTableMeta->schema; SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
int16_t numOfColumns = pTableMetaInfo->pTableMeta->tableInfo.numOfColumns; int16_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
int16_t i; int16_t i;
uint32_t nLen = 0; uint32_t nLen = 0;
for (i = 0; i < numOfColumns; ++i) { for (i = 0; i < numOfTags; ++i) {
nLen += pSchema[i].colId != columnIndex.columnIndex ? pSchema[i].bytes : pItem->bytes; nLen += (i != columnIndex.columnIndex) ? pSchema[i].bytes : pItem->bytes;
} }
if (nLen >= TSDB_MAX_BYTES_PER_ROW) { if (nLen >= TSDB_MAX_TAGS_LEN) {
return invalidOperationMsg(pMsg, msg24); return invalidOperationMsg(pMsg, msg24);
} }
......
...@@ -2776,7 +2776,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg ...@@ -2776,7 +2776,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
return code; return code;
} }
int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) { int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool autocreate, bool onlyLocal) {
assert(tIsValidName(&pTableMetaInfo->name)); assert(tIsValidName(&pTableMetaInfo->name));
uint32_t size = tscGetTableMetaMaxSize(); uint32_t size = tscGetTableMetaMaxSize();
...@@ -2822,15 +2822,20 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool ...@@ -2822,15 +2822,20 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (onlyLocal) {
return TSDB_CODE_TSC_NO_META_CACHED;
}
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate); return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
} }
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
return tscGetTableMetaImpl(pSql, pTableMetaInfo, false); return tscGetTableMetaImpl(pSql, pTableMetaInfo, false, false);
} }
int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) { int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists, bool onlyLocal) {
return tscGetTableMetaImpl(pSql, pTableMetaInfo, createIfNotExists); return tscGetTableMetaImpl(pSql, pTableMetaInfo, createIfNotExists, onlyLocal);
} }
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
......
...@@ -38,7 +38,11 @@ const int32_t TYPE_BYTES[15] = { ...@@ -38,7 +38,11 @@ const int32_t TYPE_BYTES[15] = {
#define DO_STATICS(__sum, __min, __max, __minIndex, __maxIndex, _list, _index) \ #define DO_STATICS(__sum, __min, __max, __minIndex, __maxIndex, _list, _index) \
do { \ do { \
(__sum) += (_list)[(_index)]; \ if (_list[(_index)] >= (INT64_MAX - (__sum))) { \
__sum = INT64_MAX; \
} else { \
(__sum) += (_list)[(_index)]; \
} \
if ((__min) > (_list)[(_index)]) { \ if ((__min) > (_list)[(_index)]) { \
(__min) = (_list)[(_index)]; \ (__min) = (_list)[(_index)]; \
(__minIndex) = (_index); \ (__minIndex) = (_index); \
......
Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d
...@@ -40,8 +40,9 @@ ...@@ -40,8 +40,9 @@
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeTelemetry.h" #include "dnodeTelemetry.h"
#include "module.h" #include "module.h"
#include "qScript.h"
#include "mnode.h" #include "mnode.h"
#include "qScript.h"
#include "tcache.h"
#include "tscompression.h" #include "tscompression.h"
#if !defined(_MODULE) || !defined(_TD_LINUX) #if !defined(_MODULE) || !defined(_TD_LINUX)
...@@ -208,6 +209,7 @@ void dnodeCleanUpSystem() { ...@@ -208,6 +209,7 @@ void dnodeCleanUpSystem() {
dnodeCleanupComponents(); dnodeCleanupComponents();
taos_cleanup(); taos_cleanup();
taosCloseLog(); taosCloseLog();
taosStopCacheRefreshWorker();
} }
} }
...@@ -320,12 +322,12 @@ static int32_t dnodeInitStorage() { ...@@ -320,12 +322,12 @@ static int32_t dnodeInitStorage() {
static void dnodeCleanupStorage() { static void dnodeCleanupStorage() {
// storage destroy // storage destroy
tfsDestroy(); tfsDestroy();
#ifdef TD_TSZ #ifdef TD_TSZ
// compress destroy // compress destroy
tsCompressExit(); tsCompressExit();
#endif #endif
} }
bool dnodeIsFirstDeploy() { bool dnodeIsFirstDeploy() {
......
...@@ -102,6 +102,7 @@ int32_t* taosGetErrno(); ...@@ -102,6 +102,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) //"SQL statement too long check maxSQLLength config") #define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) //"SQL statement too long check maxSQLLength config")
#define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) //"File is empty") #define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) //"File is empty")
#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B) //"Syntax error in Line") #define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B) //"Syntax error in Line")
#define TSDB_CODE_TSC_NO_META_CACHED TAOS_DEF_ERROR_CODE(0, 0x021C) //"No table meta cached")
// mnode // mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed") #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
......
...@@ -743,8 +743,8 @@ static void printHelp() { ...@@ -743,8 +743,8 @@ static void printHelp() {
"The number of threads. Default is 10."); "The number of threads. Default is 10.");
printf("%s%s%s%s\n", indent, "-i", indent, printf("%s%s%s%s\n", indent, "-i", indent,
"The sleep time (ms) between insertion. Default is 0."); "The sleep time (ms) between insertion. Default is 0.");
printf("%s%s%s%s%d\n", indent, "-S", indent, printf("%s%s%s%s%d.\n", indent, "-S", indent,
"The timestamp step between insertion. Default is %d.", "The timestamp step between insertion. Default is ",
DEFAULT_TIMESTAMP_STEP); DEFAULT_TIMESTAMP_STEP);
printf("%s%s%s%s\n", indent, "-r", indent, printf("%s%s%s%s\n", indent, "-r", indent,
"The number of records per request. Default is 30000."); "The number of records per request. Default is 30000.");
...@@ -5940,14 +5940,15 @@ static int32_t prepareStbStmt( ...@@ -5940,14 +5940,15 @@ static int32_t prepareStbStmt(
if (-1 == prepareStbStmtBind( if (-1 == prepareStbStmtBind(
tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) { tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) {
free(tagsArray); tmfree(tagsValBuf);
tmfree(tagsArray);
return -1; return -1;
} }
ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray); ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray);
tmfree(tagsValBuf); tmfree(tagsValBuf);
tmfree((char *)tagsArray); tmfree(tagsArray);
} else { } else {
ret = taos_stmt_set_tbname(stmt, tableName); ret = taos_stmt_set_tbname(stmt, tableName);
} }
......
...@@ -453,6 +453,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -453,6 +453,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'E': case 'E':
g_args.end_time = atol(arg); g_args.end_time = atol(arg);
break; break;
case 'C':
break;
case 'B': case 'B':
g_args.data_batch = atoi(arg); g_args.data_batch = atoi(arg);
if (g_args.data_batch > MAX_RECORDS_PER_REQ) { if (g_args.data_batch > MAX_RECORDS_PER_REQ) {
......
...@@ -712,9 +712,8 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { ...@@ -712,9 +712,8 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
} else { } else {
int32_t bytes = -(int32_t)(type->type); int32_t bytes = -(int32_t)(type->type);
if (bytes > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { if (bytes > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
// we have to postpone reporting the error because it cannot be done here // overflowed. set bytes to -1 so that error can be reported
// as pField->bytes is int16_t, use 'TSDB_MAX_NCHAR_LEN + 1' to avoid overflow bytes = -1;
bytes = TSDB_MAX_NCHAR_LEN + 1;
} else { } else {
bytes = bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; bytes = bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
} }
...@@ -727,8 +726,8 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { ...@@ -727,8 +726,8 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
} else { } else {
int32_t bytes = -(int32_t)(type->type); int32_t bytes = -(int32_t)(type->type);
if (bytes > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) { if (bytes > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
// refer comment for NCHAR above // overflowed. set bytes to -1 so that error can be reported
bytes = TSDB_MAX_BINARY_LEN + 1; bytes = -1;
} else { } else {
bytes += VARSTR_HEADER_SIZE; bytes += VARSTR_HEADER_SIZE;
} }
......
...@@ -83,6 +83,7 @@ typedef struct { ...@@ -83,6 +83,7 @@ typedef struct {
uint8_t deleting; // set the deleting flag to stop refreshing ASAP. uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
pthread_t refreshWorker; pthread_t refreshWorker;
bool extendLifespan; // auto extend life span when one item is accessed. bool extendLifespan; // auto extend life span when one item is accessed.
int64_t checkTick; // tick used to record the check times of the refresh threads
#if defined(LINUX) #if defined(LINUX)
pthread_rwlock_t lock; pthread_rwlock_t lock;
#else #else
...@@ -177,6 +178,11 @@ void taosCacheCleanup(SCacheObj *pCacheObj); ...@@ -177,6 +178,11 @@ void taosCacheCleanup(SCacheObj *pCacheObj);
*/ */
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp); void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp);
/**
* stop background refresh worker thread
*/
void taosStopCacheRefreshWorker();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -54,6 +54,45 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) { ...@@ -54,6 +54,45 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
#endif #endif
} }
/**
* do cleanup the taos cache
* @param pCacheObj
*/
static void doCleanupDataCache(SCacheObj *pCacheObj);
/**
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
* @param handle Cache object handle
*/
static void* taosCacheTimedRefresh(void *handle);
static pthread_t cacheRefreshWorker = {0};
static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
static SArray* pCacheArrayList = NULL;
static bool stopRefreshWorker = false;
static void doInitRefreshThread(void) {
pCacheArrayList = taosArrayInit(4, POINTER_BYTES);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL);
pthread_attr_destroy(&thattr);
}
pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) {
pthread_once(&cacheThreadInit, doInitRefreshThread);
pthread_mutex_lock(&guard);
taosArrayPush(pCacheArrayList, &pCacheObj);
pthread_mutex_unlock(&guard);
return cacheRefreshWorker;
}
/** /**
* @param key key of object for hash, usually a null-terminated string * @param key key of object for hash, usually a null-terminated string
* @param keyLen length of key * @param keyLen length of key
...@@ -142,19 +181,9 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem ...@@ -142,19 +181,9 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem
free(pElem); free(pElem);
} }
/**
* do cleanup the taos cache
* @param pCacheObj
*/
static void doCleanupDataCache(SCacheObj *pCacheObj);
/**
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
* @param handle Cache object handle
*/
static void* taosCacheTimedRefresh(void *handle);
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) { SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
const int32_t SLEEP_DURATION = 500; //500 ms
if (refreshTimeInSeconds <= 0) { if (refreshTimeInSeconds <= 0) {
return NULL; return NULL;
} }
...@@ -174,9 +203,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -174,9 +203,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
} }
// set free cache node callback function // set free cache node callback function
pCacheObj->freeFp = fn; pCacheObj->freeFp = fn;
pCacheObj->refreshTime = refreshTimeInSeconds * 1000; pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
pCacheObj->extendLifespan = extendLifespan; pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access
if (__cache_lock_init(pCacheObj) != 0) { if (__cache_lock_init(pCacheObj) != 0) {
taosHashCleanup(pCacheObj->pHashTable); taosHashCleanup(pCacheObj->pHashTable);
...@@ -186,13 +216,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -186,13 +216,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
return NULL; return NULL;
} }
pthread_attr_t thattr; doRegisterCacheObj(pCacheObj);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheTimedRefresh, pCacheObj);
pthread_attr_destroy(&thattr);
return pCacheObj; return pCacheObj;
} }
...@@ -364,7 +388,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -364,7 +388,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) { if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
uDebug("cache:%s data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime); uDebug("cache:%s, data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
} }
if (_remove) { if (_remove) {
...@@ -510,8 +534,10 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { ...@@ -510,8 +534,10 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
} }
pCacheObj->deleting = 1; pCacheObj->deleting = 1;
if (taosCheckPthreadValid(pCacheObj->refreshWorker)) {
pthread_join(pCacheObj->refreshWorker, NULL); // wait for the refresh thread quit before destroying the cache object.
while(atomic_load_8(&pCacheObj->deleting) != 0) {
taosMsleep(50);
} }
uInfo("cache:%s will be cleaned up", pCacheObj->name); uInfo("cache:%s will be cleaned up", pCacheObj->name);
...@@ -650,50 +676,79 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t ...@@ -650,50 +676,79 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t
} }
void* taosCacheTimedRefresh(void *handle) { void* taosCacheTimedRefresh(void *handle) {
SCacheObj* pCacheObj = handle; assert(pCacheArrayList != NULL);
if (pCacheObj == NULL) { uDebug("cache refresh thread starts");
uDebug("object is destroyed. no refresh retry");
return NULL;
}
setThreadName("cacheTimedRefre"); setThreadName("cacheTimedRefre");
const int32_t SLEEP_DURATION = 500; //500 ms const int32_t SLEEP_DURATION = 500; //500 ms
int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;
int64_t count = 0; int64_t count = 0;
while(1) {
taosMsleep(500);
// check if current cache object will be deleted every 500ms. while(1) {
if (pCacheObj->deleting) { taosMsleep(SLEEP_DURATION);
uDebug("%s refresh threads quit", pCacheObj->name); if (stopRefreshWorker) {
break; goto _end;
} }
if (++count < totalTick) { pthread_mutex_lock(&guard);
continue; size_t size = taosArrayGetSize(pCacheArrayList);
} pthread_mutex_unlock(&guard);
// reset the count value count += 1;
count = 0;
size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
continue;
}
uDebug("%s refresh thread timed scan", pCacheObj->name); for(int32_t i = 0; i < size; ++i) {
pCacheObj->statistics.refreshCount++; pthread_mutex_lock(&guard);
SCacheObj* pCacheObj = taosArrayGetP(pCacheArrayList, i);
// refresh data in hash table if (pCacheObj == NULL) {
if (elemInHash > 0) { uError("object is destroyed. ignore and try next");
int64_t now = taosGetTimestampMs(); pthread_mutex_unlock(&guard);
doCacheRefresh(pCacheObj, now, NULL); continue;
} }
// check if current cache object will be deleted every 500ms.
if (pCacheObj->deleting) {
taosArrayRemove(pCacheArrayList, i);
size = taosArrayGetSize(pCacheArrayList);
uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRIzu, pCacheObj->name, size);
pCacheObj->deleting = 0; //reset the deleting flag to enable pCacheObj to continue releasing resources.
pthread_mutex_unlock(&guard);
continue;
}
pthread_mutex_unlock(&guard);
if ((count % pCacheObj->checkTick) != 0) {
continue;
}
taosTrashcanEmpty(pCacheObj, false); size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
continue;
}
uDebug("%s refresh thread scan", pCacheObj->name);
pCacheObj->statistics.refreshCount++;
// refresh data in hash table
if (elemInHash > 0) {
int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, NULL);
}
taosTrashcanEmpty(pCacheObj, false);
}
} }
_end:
taosArrayDestroy(pCacheArrayList);
pCacheArrayList = NULL;
pthread_mutex_destroy(&guard);
uDebug("cache refresh thread quits");
return NULL; return NULL;
} }
...@@ -705,3 +760,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) { ...@@ -705,3 +760,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, fp); doCacheRefresh(pCacheObj, now, fp);
} }
void taosStopCacheRefreshWorker() {
stopRefreshWorker = false;
}
\ No newline at end of file
...@@ -110,6 +110,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specifie ...@@ -110,6 +110,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specifie
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long, check maxSQLLength config") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long, check maxSQLLength config")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_META_CACHED, "No table meta cached")
// mnode // mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
......
...@@ -954,7 +954,7 @@ int32_t verify_schema_less(TAOS* taos) { ...@@ -954,7 +954,7 @@ int32_t verify_schema_less(TAOS* taos) {
result = taos_query(taos, "drop database if exists test;"); result = taos_query(taos, "drop database if exists test;");
taos_free_result(result); taos_free_result(result);
usleep(100000); usleep(100000);
result = taos_query(taos, "create database test precision 'us';"); result = taos_query(taos, "create database test precision 'us' update 1;");
taos_free_result(result); taos_free_result(result);
usleep(100000); usleep(100000);
...@@ -963,6 +963,8 @@ int32_t verify_schema_less(TAOS* taos) { ...@@ -963,6 +963,8 @@ int32_t verify_schema_less(TAOS* taos) {
taos_free_result(result); taos_free_result(result);
usleep(100000); usleep(100000);
int code = 0;
char* lines[] = { char* lines[] = {
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns", "st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
...@@ -975,8 +977,8 @@ int32_t verify_schema_less(TAOS* taos) { ...@@ -975,8 +977,8 @@ int32_t verify_schema_less(TAOS* taos) {
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns" "stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
}; };
int code = 0;
code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*)); code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
char* lines2[] = { char* lines2[] = {
"stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns", "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns" "stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
...@@ -989,7 +991,27 @@ int32_t verify_schema_less(TAOS* taos) { ...@@ -989,7 +991,27 @@ int32_t verify_schema_less(TAOS* taos) {
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms" "sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
}; };
code = taos_insert_lines(taos, lines3, 2); code = taos_insert_lines(taos, lines3, 2);
return code;
char* lines4[] = {
"st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"dgtyqodr,t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns"
};
code = taos_insert_lines(taos, lines4, 2);
char* lines5[] = {
"zqlbgs,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000ns",
"zqlbgs,t9=f,id=\"zqlbgs_39302_21680\",t0=f,t1=127i8,t11=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\",t10=L\"ncharTagValue\" c10=f,c0=f,c1=127i8,c12=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64,c11=L\"ncharColValue\" 1626006833639000000ns"
};
code = taos_insert_lines(taos, &lines5[0], 1);
code = taos_insert_lines(taos, &lines5[1], 1);
char* lines6[] = {
"st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"dgtyqodr,t2=5f64,t3=L\"ste\" c1=tRue,c2=4i64,c3=\"iam\" 1626056811823316532ns"
};
code = taos_insert_lines(taos, lines6, 2);
return (code);
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
......
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
#include <unistd.h> #include <unistd.h>
int numSuperTables = 8; int numSuperTables = 8;
int numChildTables = 1024; int numChildTables = 4;
int numRowsPerChildTable = 128; int numRowsPerChildTable = 2048;
void shuffle(char**lines, size_t n) void shuffle(char**lines, size_t n)
{ {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册