提交 3296aecf 编写于 作者: H Haojun Liao

Merge branch '3.0' into fix/3_liaohj

......@@ -670,6 +670,15 @@ The charset that takes effect is UTF-8.
| Value Range | 0: not consistent; 1: consistent. |
| Default | 0 |
### smlTsDefaultName
| Attribute | Description |
| -------- | -------------------------------------------------------- |
| Applicable | Client only |
| Meaning | The name of the time column for schemaless automatic table creation is set through this configuration |
| Type | String |
| Default Value | _ts |
## Compress Parameters
### compressMsgSize
......
......@@ -34,7 +34,27 @@ In the schemaless writing data line protocol, each data item in the field_set ne
- If there are English double quotes on both sides, it indicates the BINARY(32) type. For example, `"abc"`.
- If there are double quotes on both sides and an L prefix, it means NCHAR(32) type. For example, `L"error message"`.
- Spaces, equal signs (=), commas (,), and double quotes (") need to be escaped with a backslash (\\) in front. (All refer to the ASCII character)
- Spaces, equals sign (=), comma (,), double quote ("), and backslash (\\) need to be escaped with a backslash (\\) in front. (All refer to the ASCII character). The rules are as follows:
| **Serial number** | **Element** | **Escape characters** |
| -------- | ----------- | ----------------------------- |
| 1 | Measurement | Comma, Space |
| 2 | Tag key | Comma, Equals Sign, Space |
| 3 | Tag value | Comma, Equals Sign, Space |
| 4 | Field key | Comma, Equals Sign, Space |
| 5 | Field value | Double quote, Backslash |
With two contiguous backslashes, the first is interpreted as an escape character. Examples of backslash escape rules are as follows:
| **Serial number** | **Backslashes** | **Interpreted as** |
| -------- | ----------- | ----------------------------- |
| 1 | \ | \ |
| 2 | \\\\ | \ |
| 3 | \\\\\\ | \\\\ |
| 4 | \\\\\\\\ | \\\\ |
| 5 | \\\\\\\\\\ | \\\\\\ |
| 6 | \\\\\\\\\\\\ | \\\\\\ |
- Numeric types will be distinguished from data types by the suffix.
| **Serial number** | **Postfix** | **Mapping type** | **Size (bytes)** |
......@@ -88,6 +108,8 @@ You can configure smlChildTableName in taos.cfg to specify table names, for exam
8. It is assumed that the order of field_set in a supertable is consistent, meaning that the first record contains all fields and subsequent records store fields in the same order. If the order is not consistent, set smlDataFormat in taos.cfg to false. Otherwise, data will be written out of order and a database error will occur.
Note: TDengine 3.0.3.0 and later automatically detect whether order is consistent. This parameter is no longer used.
9. Due to the fact that SQL table names do not support period (.), schemaless has also processed period (.). If there is a period (.) in the table name automatically created by schemaless, it will be automatically replaced with an underscore (\_). If you manually specify a sub table name, if there is a dot (.) in the sub table name, it will also be converted to an underscore (\_)
10. Taos.cfg adds the configuration of smlTsDefaultName (with a string value), which only works on the client side. After configuration, the time column name of the schemaless automatic table creation can be set through this configuration. If not configured, defaults to _ts.
:::tip
All processing logic of schemaless will still follow TDengine's underlying restrictions on data structures, such as the total length of each row of data cannot exceed 48 KB(64 KB since version 3.0.5.0) and the total length of a tag value cannot exceed 16 KB. See [TDengine SQL Boundary Limits](/taos-sql/limit) for specific constraints in this area.
......
......@@ -685,7 +685,16 @@ charset 的有效值是 UTF-8。
| 适用范围 | 仅客户端适用 |
| 含义 | schemaless 列数据是否顺序一致,从3.0.3.0开始,该配置废弃 |
| 值域 | 0:不一致;1: 一致 |
| 缺省值 | 0 |
| 缺省值 | 0
### smlTsDefaultName
| 属性 | 说明 |
| -------- | -------------------------------------------------------- |
| 适用范围 | 仅客户端适用 |
| 含义 | schemaless自动建表的时间列名字通过该配置设置 |
| 类型 | 字符串 |
| 缺省值 | _ts |
## 其他
......
......@@ -35,12 +35,32 @@ tag_set 中的所有的数据自动转化为 nchar 数据类型,并不需要
- 如果两边有英文双引号,表示 BINARY(32) 类型。例如 `"abc"`
- 如果两边有英文双引号而且带有 L 前缀,表示 NCHAR(32) 类型。例如 `L"报错信息"`
- 对空格、等号(=)、逗号(,)、双引号("),前面需要使用反斜杠(\)进行转义。(都指的是英文半角符号)
- 对空格、等号(=)、逗号(,)、双引号(")、反斜杠(\),前面需要使用反斜杠(\)进行转义。(都指的是英文半角符号)。具体转义规则如下:
| **序号** | **域** | **需转义字符** |
| -------- | ----------- | ----------------------------- |
| 1 | 超级表名 | 逗号,空格 |
| 2 | 标签名 | 逗号,等号,空格 |
| 3 | 标签值 | 逗号,等号,空格 |
| 4 | 列名 | 逗号,等号,空格 |
| 5 | 列值 | 双引号,反斜杠 |
两个连续的反斜杠,第一个作为转义符,只有一个反斜杠则无需转义. 反斜杠转义规则举例如下:
| **序号** | **反斜杠** | **转义为** |
| -------- | ----------- | ----------------------------- |
| 1 | \ | \ |
| 2 | \\\\ | \ |
| 3 | \\\\\\ | \\\\ |
| 4 | \\\\\\\\ | \\\\ |
| 5 | \\\\\\\\\\ | \\\\\\ |
| 6 | \\\\\\\\\\\\ | \\\\\\ |
- 数值类型将通过后缀来区分数据类型:
| **序号** | **后缀** | **映射类型** | **大小(字节)** |
| **序号** | **后缀** | **映射类型** | **大小(字节)** |
| -------- | ----------- | ----------------------------- | -------------- |
| 1 | 无或 f64 | double | 8 |
| 1 | 无或 f64 | double | 8 |
| 2 | f32 | float | 4 |
| 3 | i8/u8 | TinyInt/UTinyInt | 1 |
| 4 | i16/u16 | SmallInt/USmallInt | 2 |
......@@ -84,7 +104,9 @@ st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000
6. 对 BINARY 或 NCHAR 列,如果数据行中所提供值的长度超出了列类型的限制,自动增加该列允许存储的字符长度上限(只增不减),以保证数据的完整保存。
7. 整个处理过程中遇到的错误会中断写入过程,并返回错误代码。
8. 为了提高写入的效率,默认假设同一个超级表中 field_set 的顺序是一样的(第一条数据包含所有的 field,后面的数据按照这个顺序),如果顺序不一样,需要配置参数 smlDataFormat 为 false,否则,数据写入按照相同顺序写入,库中数据会异常,从3.0.3.0开始,自动检测顺序是否一致,该配置废弃。
9. 由于sql建表表名不支持点号(.),所以schemaless也对点号(.)做了处理,如果schemaless自动建表的表名如果有点号(.),会自动替换为下划线(\_)。如果手动指定子表名的话,子表名里有点号(.),同样转化为下划线(\_)。
10. taos.cfg 增加 smlTsDefaultName 配置(值为字符串),只在client端起作用,配置后,schemaless自动建表的时间列名字可以通过该配置设置。不配置的话,默认为 _ts
:::tip
无模式所有的处理逻辑,仍会遵循 TDengine 对数据结构的底层限制,例如每行数据的总长度不能超过
48KB(从 3.0.5.0 版本开始为 64KB),标签值的总长度不超过16KB。这方面的具体限制约束请参见 [TDengine SQL 边界限制](/taos-sql/limit)
......
......@@ -4057,13 +4057,20 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
bool compareStateKey(void* data, void* key) {
if (!data || !key) {
return false;
return true;
}
SStateKeys* stateKey = (SStateKeys*)key;
stateKey->pData = (char*)key + sizeof(SStateKeys);
return compareVal(data, stateKey);
}
bool compareWinStateKey(SStateKeys* left, SStateKeys* right) {
if (!left || !right) {
return false;
}
return compareVal(left->pData, right);
}
void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) {
int32_t size = pAggSup->resultRowSize;
......@@ -4086,10 +4093,14 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size);
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pCurWin->pStateKey->type = pAggSup->stateKeyType;
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
pCurWin->pStateKey->isNull = false;
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pCurWin->pStateKey->type = pAggSup->stateKeyType;
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
pCurWin->pStateKey->isNull = false;
pCurWin->winInfo.sessionWin.groupId = groupId;
pCurWin->winInfo.sessionWin.win.skey = ts;
pCurWin->winInfo.sessionWin.win.ekey = ts;
qDebug("===stream===reset state win key. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, pCurWin->winInfo.sessionWin.win.ekey);
}
if (code == TSDB_CODE_SUCCESS) {
......@@ -4241,6 +4252,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
qDebug("===stream=== stream state agg");
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
......@@ -4340,6 +4352,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
void streamStateReleaseState(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
......@@ -4365,6 +4378,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
if (pNextWin->isOutput && pStDeleted) {
qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, pNextWin->sessionWin.groupId);
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
}
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
......@@ -4383,19 +4397,28 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
qDebug("===stream=== reload state. get result count:%d", num);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pSeUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
}
if (!pInfo->pSeDeleted && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) {
SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0};
SStateWindowInfo dummy = {0};
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i);
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
bool cpRes = compareWinStateKey(curInfo.pStateKey,nextInfo.pStateKey);
qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d", nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes);
if (cpRes) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeDeleted);
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, curInfo.winInfo.sessionWin.groupId);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......
......@@ -581,6 +581,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) {
return code;
}
streamSchedExec(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
......
......@@ -729,6 +729,7 @@ void streamStateFreeVal(void* val) {
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId);
return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册