提交 ce078ae8 编写于 作者: wmmhello's avatar wmmhello

Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0

...@@ -98,7 +98,7 @@ Provides information about user-created databases. Similar to SHOW DATABASES. ...@@ -98,7 +98,7 @@ Provides information about user-created databases. Similar to SHOW DATABASES.
| 21 | cachesize | INT | Memory per vnode used for caching the newest data. It should be noted that `cachesize` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 21 | cachesize | INT | Memory per vnode used for caching the newest data. It should be noted that `cachesize` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 22 | wal_level | INT | WAL level. It should be noted that `wal_level` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 22 | wal_level | INT | WAL level. It should be noted that `wal_level` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 23 | wal_fsync_period | INT | Interval at which WAL is written to disk. It should be noted that `wal_fsync_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 23 | wal_fsync_period | INT | Interval at which WAL is written to disk. It should be noted that `wal_fsync_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 24 | wal_retention_period | INT | WAL retention period. It should be noted that `wal_retention_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 24 | wal_retention_period | INT | WAL retention period, in second. It should be noted that `wal_retention_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 25 | wal_retention_size | INT | Maximum WAL size. It should be noted that `wal_retention_size` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 25 | wal_retention_size | INT | Maximum WAL size. It should be noted that `wal_retention_size` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 26 | stt_trigger | SMALLINT | The threshold for number of files to trigger file merging. It should be noted that `stt_trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 26 | stt_trigger | SMALLINT | The threshold for number of files to trigger file merging. It should be noted that `stt_trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 27 | table_prefix | SMALLINT | The prefix length in the table name that is ignored when distributing table to vnode based on table name. It should be noted that `table_prefix` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 27 | table_prefix | SMALLINT | The prefix length in the table name that is ignored when distributing table to vnode based on table name. It should be noted that `table_prefix` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
...@@ -297,3 +297,13 @@ Provides dnode configuration information. ...@@ -297,3 +297,13 @@ Provides dnode configuration information.
| 7 | target_table | BINARY(192) | Target table | | 7 | target_table | BINARY(192) | Target table |
| 8 | watermark | BIGINT | Watermark (see stream processing documentation). It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 8 | watermark | BIGINT | Watermark (see stream processing documentation). It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 9 | trigger | INT | Method of triggering the result push (see stream processing documentation). It should be noted that `trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 9 | trigger | INT | Method of triggering the result push (see stream processing documentation). It should be noted that `trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
## INS_USER_PRIVILEGES
| # | **Column** | **Data Type** | **Description** |** |
| --- | :----------: | ------------ | -------------------------------------------|
| 1 | user_name | VARCHAR(24) | Username |
| 2 | privilege | VARCHAR(10) | Privilege description |
| 3 | db_name | VARCHAR(65) | Database name |
| 4 | table_name | VARCHAR(193) | Table name |
| 5 | condition | VARCHAR(49152) | The privilege filter for child tables |
...@@ -79,6 +79,12 @@ Parameter Description: ...@@ -79,6 +79,12 @@ Parameter Description:
- tz: Optional parameter that specifies the timezone of the returned time, following the IANA Time Zone rules, e.g. `America/New_York`. - tz: Optional parameter that specifies the timezone of the returned time, following the IANA Time Zone rules, e.g. `America/New_York`.
- req_id: Optional parameter that specifies the request id for tracing. - req_id: Optional parameter that specifies the request id for tracing.
:::note
URL Encoding. Make sure that parameters are properly encoded. For example, when specifying a timezone you must properly encode special characters. ?tz=Etc/GMT+10 will not work because the <+> plus symbol is recognized as a space in the url. It's best practice to encode all special characters in a parameter. Instead use ?tz=Etc%2FGMT%2B10 for the parameter.
:::
For example, `http://h1.taos.com:6041/rest/sql/test` is a URL to `h1.taos.com:6041` and sets the default database name to `test`. For example, `http://h1.taos.com:6041/rest/sql/test` is a URL to `h1.taos.com:6041` and sets the default database name to `test`.
TDengine supports both Basic authentication and custom authentication mechanisms, and subsequent versions will provide a standard secure digital signature mechanism for authentication. TDengine supports both Basic authentication and custom authentication mechanisms, and subsequent versions will provide a standard secure digital signature mechanism for authentication.
......
...@@ -338,7 +338,7 @@ Remark: ...@@ -338,7 +338,7 @@ Remark:
Equivalent function: sum Equivalent function: sum
```sql ```sql
Select max(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s) Select sum(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s)
``` ```
Note: This function has no interpolation requirements, so it can be directly calculated. Note: This function has no interpolation requirements, so it can be directly calculated.
......
...@@ -98,7 +98,7 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 ...@@ -98,7 +98,7 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| 21 | cachesize | INT | 表示每个 vnode 中用于缓存子表最近数据的内存大小。需要注意,`cachesize` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 21 | cachesize | INT | 表示每个 vnode 中用于缓存子表最近数据的内存大小。需要注意,`cachesize` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 22 | wal_level | INT | WAL 级别。需要注意,`wal_level` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 22 | wal_level | INT | WAL 级别。需要注意,`wal_level` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 23 | wal_fsync_period | INT | 数据落盘周期。需要注意,`wal_fsync_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 23 | wal_fsync_period | INT | 数据落盘周期。需要注意,`wal_fsync_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 24 | wal_retention_period | INT | WAL 的保存时长。需要注意,`wal_retention_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 24 | wal_retention_period | INT | WAL 的保存时长,单位为秒。需要注意,`wal_retention_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 25 | wal_retention_size | INT | WAL 的保存上限。需要注意,`wal_retention_size` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 25 | wal_retention_size | INT | WAL 的保存上限。需要注意,`wal_retention_size` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 26 | stt_trigger | SMALLINT | 触发文件合并的落盘文件的个数。需要注意,`stt_trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 26 | stt_trigger | SMALLINT | 触发文件合并的落盘文件的个数。需要注意,`stt_trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 27 | table_prefix | SMALLINT | 内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。需要注意,`table_prefix` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 27 | table_prefix | SMALLINT | 内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。需要注意,`table_prefix` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
...@@ -298,3 +298,13 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 ...@@ -298,3 +298,13 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| 7 | target_table | BINARY(192) | 流计算写入的目标表 | | 7 | target_table | BINARY(192) | 流计算写入的目标表 |
| 8 | watermark | BIGINT | watermark,详见 SQL 手册流式计算。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 8 | watermark | BIGINT | watermark,详见 SQL 手册流式计算。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 9 | trigger | INT | 计算结果推送模式,详见 SQL 手册流式计算。需要注意,`trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 9 | trigger | INT | 计算结果推送模式,详见 SQL 手册流式计算。需要注意,`trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
## INS_USER_PRIVILEGES
| # | **列名** | **数据类型** | **说明** |
| --- | :----------: | ------------ | -------------------------------------------------------------------------------------------------------------------- |
| 1 | user_name | VARCHAR(24) | 用户名
| 2 | privilege | VARCHAR(10) | 权限描述
| 3 | db_name | VARCHAR(65) | 数据库名称
| 4 | table_name | VARCHAR(193) | 表名称
| 5 | condition | VARCHAR(49152) | 子表权限过滤条件
...@@ -371,7 +371,7 @@ Select min(val) from table_name ...@@ -371,7 +371,7 @@ Select min(val) from table_name
等效函数:sum 等效函数:sum
```sql ```sql
Select max(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s) Select sum(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s)
``` ```
备注:该函数无插值需求,因此可用直接计算。 备注:该函数无插值需求,因此可用直接计算。
......
...@@ -221,13 +221,9 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan ...@@ -221,13 +221,9 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo);
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo);
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
......
...@@ -368,6 +368,8 @@ typedef struct SStateStore { ...@@ -368,6 +368,8 @@ typedef struct SStateStore {
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
void (*updateInfoDestroy)(SUpdateInfo* pInfo); void (*updateInfoDestroy)(SUpdateInfo* pInfo);
void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count);
void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count);
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
......
...@@ -604,15 +604,10 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* ...@@ -604,15 +604,10 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common // common
int32_t streamSetParamForScanHistory(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status); const char* streamGetTaskStatusStr(int32_t status);
...@@ -626,7 +621,6 @@ void streamTaskEnablePause(SStreamTask* pTask); ...@@ -626,7 +621,6 @@ void streamTaskEnablePause(SStreamTask* pTask);
// source level // source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
......
...@@ -53,6 +53,8 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); ...@@ -53,6 +53,8 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count);
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -369,8 +369,13 @@ typedef enum ELogicConditionType { ...@@ -369,8 +369,13 @@ typedef enum ELogicConditionType {
#define TSDB_DB_SCHEMALESS_OFF 0 #define TSDB_DB_SCHEMALESS_OFF 0
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
#define TSDB_MIN_STT_TRIGGER 1 #define TSDB_MIN_STT_TRIGGER 1
#define TSDB_MAX_STT_TRIGGER 16 #ifdef TD_ENTERPRISE
#define TSDB_DEFAULT_SST_TRIGGER 2 #define TSDB_MAX_STT_TRIGGER 16
#define TSDB_DEFAULT_SST_TRIGGER 2
#else
#define TSDB_MAX_STT_TRIGGER 1
#define TSDB_DEFAULT_SST_TRIGGER 1
#endif
#define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN) #define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN)
#define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2) #define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2)
#define TSDB_DEFAULT_HASH_PREFIX 0 #define TSDB_DEFAULT_HASH_PREFIX 0
......
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
static SDnode globalDnode = {0}; static SDnode globalDnode = {0};
static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS", static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS",
"FreeBSD", "openSUSE", "SLES", "Fedora", "MacOS"}; "FreeBSD", "openSUSE", "SLES", "Fedora", "macOS"};
SDnode *dmInstance() { return &globalDnode; } SDnode *dmInstance() { return &globalDnode; }
......
...@@ -78,6 +78,8 @@ void initStateStoreAPI(SStateStore* pStore) { ...@@ -78,6 +78,8 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->updateInfoIsUpdated = updateInfoIsUpdated; pStore->updateInfoIsUpdated = updateInfoIsUpdated;
pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
pStore->updateInfoDestroy = updateInfoDestroy; pStore->updateInfoDestroy = updateInfoDestroy;
pStore->windowSBfDelete = windowSBfDelete;
pStore->windowSBfAdd = windowSBfAdd;
pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoInitP = updateInfoInitP;
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
......
...@@ -1321,7 +1321,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1321,7 +1321,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64, "window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey); id, pWindow->skey, pWindow->ekey);
qResetStreamInfoTimeWindow(pTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pTask->exec.pExecutor);
} else { } else {
// when related fill-history task exists, update the fill-history time window only when the // when related fill-history task exists, update the fill-history time window only when the
// state transfer is completed. // state transfer is completed.
...@@ -1592,9 +1592,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1592,9 +1592,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId); pReq->taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1606,9 +1605,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1606,9 +1605,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) { if (pHistoryTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success", tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId); pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
...@@ -1616,14 +1614,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1616,14 +1614,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} }
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask); streamTaskPause(pHistoryTask);
if (pHistoryTask != NULL) {
streamMetaReleaseTask(pMeta, pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask);
} }
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1652,7 +1648,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, ...@@ -1652,7 +1648,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
} }
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartRecoverTask(pTask, igUntreated); streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} else { } else {
......
...@@ -1729,45 +1729,41 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1729,45 +1729,41 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
// row in last file block // row in last file block
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < tsLast) { if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key > tsLast) {
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
} else {
if (key > tsLast) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key < tsLast) { } else if (key == ts) {
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); SRow* pTSRow = NULL;
} int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
} if (code != TSDB_CODE_SUCCESS) {
// the following for key == tsLast return code;
SRow* pTSRow = NULL; }
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
code = tsdbRowMergerGetRow(pMerger, &pTSRow); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosMemoryFree(pTSRow); code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
tsdbRowMergerClear(pMerger);
return code;
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
return code;
} else { // key > ts
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
} else { // desc order
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
}
} else { // only last block exists } else { // only last block exists
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
} }
...@@ -2194,8 +2190,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2194,8 +2190,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW *pRow = NULL, *piRow = NULL; TSDBROW *pRow = NULL, *piRow = NULL;
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
(ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
} }
...@@ -2569,18 +2564,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2569,18 +2564,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (pScanInfo == NULL) {
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
continue;
}
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo);
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2589,6 +2575,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2589,6 +2575,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
continue; continue;
} }
// reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo);
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) { if (!hasDataInLastFile) {
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
...@@ -2678,32 +2667,16 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2678,32 +2667,16 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else { } else {
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) {
int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN; // only return the rows in last block
if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) || int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
(!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) { ASSERT(tsLast >= pBlockInfo->record.lastKey);
// whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
} else {
SBlockData* pBData = &pReader->status.fileBlockData; SBlockData* pBData = &pReader->status.fileBlockData;
tBlockDataReset(pBData); tBlockDataReset(pBData);
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
tsdbDebug("load data in last block firstly %s", pReader->idStr); tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -2734,8 +2707,23 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2734,8 +2707,23 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr); pResBlock->info.rows, el, pReader->idStr);
} }
} } else { // whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
}
} }
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
...@@ -4109,6 +4097,11 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { ...@@ -4109,6 +4097,11 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
tsdbDataFileReaderClose(&pReader->pFileReader); tsdbDataFileReaderClose(&pReader->pFileReader);
int64_t loadBlocks = 0;
double elapse = 0;
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
// resetDataBlockScanInfo excluding lastKey // resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
int32_t iter = 0; int32_t iter = 0;
...@@ -4179,7 +4172,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { ...@@ -4179,7 +4172,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
} }
} }
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false); tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
pReader->pReadSnap = NULL; pReader->pReadSnap = NULL;
pReader->flag = READER_STATUS_SUSPEND; pReader->flag = READER_STATUS_SUSPEND;
......
...@@ -180,6 +180,8 @@ void initStateStoreAPI(SStateStore* pStore) { ...@@ -180,6 +180,8 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->updateInfoIsUpdated = updateInfoIsUpdated; pStore->updateInfoIsUpdated = updateInfoIsUpdated;
pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
pStore->updateInfoDestroy = updateInfoDestroy; pStore->updateInfoDestroy = updateInfoDestroy;
pStore->windowSBfDelete = windowSBfDelete;
pStore->windowSBfAdd = windowSBfAdd;
pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoInitP = updateInfoInitP;
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
......
...@@ -62,8 +62,8 @@ typedef struct { ...@@ -62,8 +62,8 @@ typedef struct {
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep; int8_t recoverStep;
bool recoverStep1Finished; // bool recoverStep1Finished;
bool recoverStep2Finished; // bool recoverStep2Finished;
int8_t recoverScanFinished; int8_t recoverScanFinished;
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
SVersionRange fillHistoryVer; SVersionRange fillHistoryVer;
......
...@@ -116,17 +116,6 @@ void resetTaskInfo(qTaskInfo_t tinfo) { ...@@ -116,17 +116,6 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
clearStreamBlock(pTaskInfo->pRoot); clearStreamBlock(pTaskInfo->pRoot);
} }
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
if (pTaskInfo == NULL) {
return;
}
qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX);
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX;
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -341,7 +330,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v ...@@ -341,7 +330,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
return NULL; return NULL;
} }
qResetStreamInfoTimeWindow(pTaskInfo); qStreamInfoResetTimewindowFilter(pTaskInfo);
return pTaskInfo; return pTaskInfo;
} }
...@@ -891,8 +880,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan ...@@ -891,8 +880,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
pStreamInfo->recoverStep1Finished = false;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64, " - %" PRId64,
...@@ -910,8 +897,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan ...@@ -910,8 +897,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
", window:%" PRId64 " - %" PRId64, ", window:%" PRId64 " - %" PRId64,
...@@ -1050,23 +1035,15 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { ...@@ -1050,23 +1035,15 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
return pTaskInfo->streamInfo.recoverScanFinished; return pTaskInfo->streamInfo.recoverScanFinished;
} }
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) { int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverStep1Finished; STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
}
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) { qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64,
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
return pTaskInfo->streamInfo.recoverStep2Finished;
}
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->streamInfo.recoverStep1Finished = true;
pTaskInfo->streamInfo.recoverStep2Finished = true;
// reset the time window pWindow->skey = INT64_MIN;
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; pWindow->ekey = INT64_MAX;
return 0; return 0;
} }
......
...@@ -1590,38 +1590,51 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW ...@@ -1590,38 +1590,51 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW
} }
// re-build the delete block, ONLY according to the split timestamp // re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
if (skey == INT64_MIN) {
return;
}
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); bool hasUnqualified = false;
bool hasUnqualified = false; int64_t skey = pWindow->skey;
int64_t ekey = pWindow->ekey;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
for (int32_t i = 0; i < numOfRows; i++) { if (pWindow->skey != INT64_MIN) {
if (tsStartCol[i] < skey) { for (int32_t i = 0; i < numOfRows; i++) {
tsStartCol[i] = skey; if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
} }
} else if (pWindow->ekey != INT64_MAX) {
for(int32_t i = 0; i < numOfRows; ++i) {
if (tsEndCol[i] > ekey) {
tsEndCol[i] = ekey;
}
if (tsEndCol[i] >= skey) { if (tsStartCol[i] <= ekey) {
p[i] = true; p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] } else {
hasUnqualified = true; hasUnqualified = true;
}
} }
} }
if (hasUnqualified) { if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p); trimDataBlock(pBlock, pBlock->info.rows, p);
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
} else {
qDebug("%s not update the delete block", id);
} }
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p); taosMemoryFree(p);
} }
...@@ -2030,7 +2043,7 @@ FETCH_NEXT_BLOCK: ...@@ -2030,7 +2043,7 @@ FETCH_NEXT_BLOCK:
} }
setBlockGroupIdByUid(pInfo, pDelBlock); setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id); rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered"); printDataBlock(pDelBlock, "stream scan delete recv filtered");
if (pDelBlock->info.rows == 0) { if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) { if (pInfo->tqReader) {
...@@ -2411,7 +2424,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) { ...@@ -2411,7 +2424,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo; pInfo->pUpdateInfo = pUpInfo;
} else { } else {
pInfo->pUpdateInfo->minTS = TMAX(pInfo->pUpdateInfo->minTS, pUpInfo->minTS); pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1);
pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1);
ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS);
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion); pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion);
SHashObj* curMap = pInfo->pUpdateInfo->pMap; SHashObj* curMap = pInfo->pUpdateInfo->pMap;
void *pIte = taosHashIterate(curMap, NULL); void *pIte = taosHashIterate(curMap, NULL);
......
...@@ -62,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* ...@@ -62,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
......
...@@ -163,15 +163,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -163,15 +163,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
} }
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t code = 0;
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.pExecutor; int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
bool finished = false;
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
bool finished = false;
while (1) { while (!finished) {
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
...@@ -184,44 +183,30 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -184,44 +183,30 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
return -1; return -1;
} }
int32_t batchCnt = 0; int32_t numOfBlocks = 0;
while (1) { while (1) {
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0; return 0;
} }
if (streamTaskShouldPause(&pTask->status)) {
break;
}
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
continue; continue;
} }
if (output == NULL) { if (output == NULL && qStreamRecoverScanFinished(exec)) {
if (qStreamRecoverScanFinished(exec)) { finished = true;
finished = true;
} else {
qSetStreamOpOpen(exec);
if (streamTaskShouldPause(&pTask->status)) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
code = streamTaskOutputResultBlock(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code;
}
return 0;
}
}
break; break;
} else {
if (output == NULL) {
ASSERT(0);
}
} }
SSDataBlock block = {0}; SSDataBlock block = {0};
...@@ -229,86 +214,37 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -229,86 +214,37 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
block.info.childId = pTask->info.selfChildId; block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
batchCnt++; numOfBlocks++;
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz);
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz); if (numOfBlocks >= batchSz) {
if (batchCnt >= batchSz) {
break; break;
} }
} }
if (taosArrayGetSize(pRes) == 0) { if (taosArrayGetSize(pRes) > 0) {
taosArrayDestroy(pRes); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
if (finished) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
qDebug("s-task:%s finish recover exec task ", pTask->id.idStr); terrno = TSDB_CODE_OUT_OF_MEMORY;
break; return -1;
} else {
qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
continue;
} }
}
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
code = streamTaskOutputResultBlock(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code;
}
if (finished) {
break;
}
}
return 0;
}
#if 0 code = streamTaskOutputResultBlock(pTask, qRes);
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
// fetch all queue item, merge according to batchLimit taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall); taosFreeQitem(qRes);
if (numOfItems == 0) { return code;
qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId); }
return 0;
}
SStreamQueueItem* pMerged = NULL;
SStreamQueueItem* pItem = NULL;
taosGetQitem(pTask->inputQall, (void**)&pItem);
if (pItem == NULL) {
if (pMerged != NULL) {
// process merged item
} else { } else {
return 0; taosArrayDestroy(pRes);
} }
} }
// if drop
if (pItem->type == STREAM_INPUT__DESTROY) {
// set status drop
return -1;
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
}
// exec impl
// output
// try dispatch
return 0; return 0;
} }
#endif
int32_t updateCheckPointInfo(SStreamTask* pTask) { int32_t updateCheckPointInfo(SStreamTask* pTask) {
int64_t ckId = 0; int64_t ckId = 0;
...@@ -356,12 +292,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { ...@@ -356,12 +292,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
} }
} }
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo: destroy this task here // todo: destroy the fill-history task here
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
pTask->streamTaskId.taskId); pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
...@@ -402,34 +338,36 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -402,34 +338,36 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
} }
// expand the query time window for stream scanner // 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
// transfer the ownership of executor state // 2. transfer the ownership of executor state
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// clear the link between fill-history task and stream task info // 3. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0; pStreamTask->historyTaskId.taskId = 0;
// 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet.
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
// free it and remove it from disk meta-store // 5. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, taskId); streamMetaUnregisterTask(pMeta, taskId);
// save to disk // 6. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask); streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
// pause allowed // 7. pause allowed.
streamTaskEnablePause(pStreamTask); streamTaskEnablePause(pStreamTask);
streamSchedExec(pStreamTask); streamSchedExec(pStreamTask);
...@@ -437,6 +375,26 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -437,6 +375,26 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
if (!pTask->status.transferState) {
return code;
}
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask);
streamTaskEndScanWAL(pTask);
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
}
return code;
}
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
const char* id) { const char* id) {
int32_t retryTimes = 0; int32_t retryTimes = 0;
...@@ -590,17 +548,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { ...@@ -590,17 +548,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
// 3. notify downstream tasks to transfer executor state after handle all history blocks. // 1. notify all downstream tasks to transfer executor state after handle all history blocks.
pTask->status.transferState = true;
int32_t code = streamDispatchTransferStateMsg(pTask); int32_t code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// todo handle error // todo handle error
} }
// the last execution of fill-history task, in order to transfer task operator states. // 2. do transfer stream task operator states.
code = streamTransferStateToStreamTask(pTask); pTask->status.transferState = true;
if (code != TSDB_CODE_SUCCESS) { // todo handle this code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle error
return code; return code;
} }
...@@ -624,9 +581,11 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -624,9 +581,11 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here // todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) { if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed // fill-history WAL scan has completed
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) { if (pTask->status.transferState) {
streamTaskRecoverSetAllStepFinished(pTask); code = streamTransferStateToStreamTask(pTask);
streamTaskEndScanWAL(pTask); if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else { } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
......
...@@ -17,23 +17,30 @@ ...@@ -17,23 +17,30 @@
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
static void launchFillHistoryTask(SStreamTask* pTask); typedef struct SStreamTaskRetryInfo {
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); SStreamMeta* pMeta;
int32_t taskId;
} SStreamTaskRetryInfo;
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void launchFillHistoryTask(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1; pTask->status.downstreamReady = 1;
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
} }
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
SStreamScanHistoryReq req; SStreamScanHistoryReq req;
streamBuildSourceRecover1Req(pTask, &req, igUntreated); initScanHistoryReq(pTask, &req, igUntreated);
int32_t len = sizeof(SStreamScanHistoryReq);
int32_t len = sizeof(SStreamScanHistoryReq);
void* serializedReq = rpcMallocCont(len); void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) { if (serializedReq == NULL) {
return -1; return -1;
...@@ -65,9 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { ...@@ -65,9 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask); streamSetParamForScanHistory(pTask);
} }
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartRecoverTask(pTask, 0); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code; return code;
} }
...@@ -142,7 +149,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { ...@@ -142,7 +149,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
} else { } else {
qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskSetForReady(pTask, 0); streamTaskSetReady(pTask, 0);
streamTaskSetRangeStreamCalc(pTask); streamTaskSetRangeStreamCalc(pTask);
streamTaskLaunchScanHistory(pTask); streamTaskLaunchScanHistory(pTask);
...@@ -188,7 +195,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { ...@@ -188,7 +195,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
} }
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
streamTaskSetForReady(pTask, numOfReqs); streamTaskSetReady(pTask, numOfReqs);
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int8_t status = pTask->status.taskStatus; int8_t status = pTask->status.taskStatus;
...@@ -319,7 +326,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *p ...@@ -319,7 +326,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *p
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
} }
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
pReq->msgHead.vgId = pTask->info.nodeId; pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
...@@ -524,11 +531,6 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { ...@@ -524,11 +531,6 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
streamTaskDoCheckDownstreamTasks(pHTask); streamTaskDoCheckDownstreamTasks(pHTask);
} }
typedef struct SStreamTaskRetryInfo {
SStreamMeta* pMeta;
int32_t taskId;
} SStreamTaskRetryInfo;
static void tryLaunchHistoryTask(void* param, void* tmrId) { static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTaskRetryInfo* pInfo = param; SStreamTaskRetryInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta; SStreamMeta* pMeta = pInfo->pMeta;
...@@ -638,7 +640,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { ...@@ -638,7 +640,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
} }
} }
// dispatch recover finish req to all related downstream task // dispatch scan-history finish req to all related downstream task
code = streamDispatchScanHistoryFinishMsg(pTask); code = streamDispatchScanHistoryFinishMsg(pTask);
if (code < 0) { if (code < 0) {
return -1; return -1;
...@@ -647,19 +649,9 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { ...@@ -647,19 +649,9 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
return 0; return 0;
} }
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) { int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRecoverScanStep1Finished(exec);
}
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
return qStreamRecoverScanStep2Finished(exec); return qStreamInfoResetTimewindowFilter(exec);
}
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRecoverSetAllStepFinished(exec);
} }
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
...@@ -669,7 +661,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { ...@@ -669,7 +661,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
int64_t nextStartVer = pRange->maxVer + 1; int64_t nextStartVer = pRange->maxVer + 1;
if (nextStartVer > latestVer - 1) { if (nextStartVer > latestVer - 1) {
// no input data yet. no need to execute the secondardy scan while stream task halt // no input data yet. no need to execute the secondardy scan while stream task halt
streamTaskRecoverSetAllStepFinished(pTask); streamTaskFillHistoryFinished(pTask);
qDebug( qDebug(
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
"related stream task currentVer:%" PRId64, "related stream task currentVer:%" PRId64,
...@@ -684,7 +676,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { ...@@ -684,7 +676,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
} }
} }
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
...@@ -857,7 +848,7 @@ void streamTaskPause(SStreamTask* pTask) { ...@@ -857,7 +848,7 @@ void streamTaskPause(SStreamTask* pTask) {
taosMsleep(100); taosMsleep(100);
} }
// todo: use the lock of the task. // todo: use the task lock, stead of meta lock
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
status = pTask->status.taskStatus; status = pTask->status.taskStatus;
...@@ -871,6 +862,12 @@ void streamTaskPause(SStreamTask* pTask) { ...@@ -871,6 +862,12 @@ void streamTaskPause(SStreamTask* pTask) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
// in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) {
void* pExecutor = pTask->exec.pExecutor;
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
}
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
if (pInfo->numSBFs < count) { if (pInfo->numSBFs < count) {
count = pInfo->numSBFs; count = pInfo->numSBFs;
} }
...@@ -49,7 +49,7 @@ static void clearItemHelper(void *p) { ...@@ -49,7 +49,7 @@ static void clearItemHelper(void *p) {
tScalableBfDestroy(*pBf); tScalableBfDestroy(*pBf);
} }
static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
if (count < pInfo->numSBFs) { if (count < pInfo->numSBFs) {
for (uint64_t i = 0; i < count; ++i) { for (uint64_t i = 0; i < count; ++i) {
SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0); SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);
......
...@@ -3,4 +3,6 @@ ...@@ -3,4 +3,6 @@
rem echo taskkill /F /IM taosd.exe rem echo taskkill /F /IM taosd.exe
wmic process where "name='taosd.exe'" call terminate > NUL 2>&1 wmic process where "name='taosd.exe'" call terminate > NUL 2>&1
taskkill /F /IM taosd.exe > NUL 2>&1 taskkill /F /IM taosd.exe > NUL 2>&1
\ No newline at end of file
rem echo taskkill /F /IM taosd.exe finished
\ No newline at end of file
...@@ -68,7 +68,7 @@ int32_t shellCheckIntSize() { ...@@ -68,7 +68,7 @@ int32_t shellCheckIntSize() {
return 0; return 0;
} }
void shellPrintVersion() { printf("version: %s\r\n", version); } void shellPrintVersion() { printf("%s\r\n", shell.info.programVersion); }
void shellGenerateAuth() { void shellGenerateAuth() {
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
......
...@@ -2,7 +2,7 @@ aux_source_directory(src TSIM_SRC) ...@@ -2,7 +2,7 @@ aux_source_directory(src TSIM_SRC)
add_executable(tsim ${TSIM_SRC}) add_executable(tsim ${TSIM_SRC})
target_link_libraries( target_link_libraries(
tsim tsim
PUBLIC taos PUBLIC taos_static
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册