diff --git a/docs/en/12-taos-sql/22-meta.md b/docs/en/12-taos-sql/22-meta.md index 2b4a7e52505df5780c6f4705736e990cb4c51313..37304633e76b9c3c62106baa06debe54d5b922a0 100644 --- a/docs/en/12-taos-sql/22-meta.md +++ b/docs/en/12-taos-sql/22-meta.md @@ -98,7 +98,7 @@ Provides information about user-created databases. Similar to SHOW DATABASES. | 21 | cachesize | INT | Memory per vnode used for caching the newest data. It should be noted that `cachesize` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 22 | wal_level | INT | WAL level. It should be noted that `wal_level` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 23 | wal_fsync_period | INT | Interval at which WAL is written to disk. It should be noted that `wal_fsync_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 24 | wal_retention_period | INT | WAL retention period. It should be noted that `wal_retention_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 24 | wal_retention_period | INT | WAL retention period, in second. It should be noted that `wal_retention_period` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 25 | wal_retention_size | INT | Maximum WAL size. It should be noted that `wal_retention_size` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 26 | stt_trigger | SMALLINT | The threshold for number of files to trigger file merging. It should be noted that `stt_trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 27 | table_prefix | SMALLINT | The prefix length in the table name that is ignored when distributing table to vnode based on table name. It should be noted that `table_prefix` is a TDengine keyword and needs to be escaped with ` when used as a column name. | @@ -297,3 +297,13 @@ Provides dnode configuration information. | 7 | target_table | BINARY(192) | Target table | | 8 | watermark | BIGINT | Watermark (see stream processing documentation). It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 9 | trigger | INT | Method of triggering the result push (see stream processing documentation). It should be noted that `trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. | + +## INS_USER_PRIVILEGES + +| # | **Column** | **Data Type** | **Description** |** | +| --- | :----------: | ------------ | -------------------------------------------| +| 1 | user_name | VARCHAR(24) | Username | +| 2 | privilege | VARCHAR(10) | Privilege description | +| 3 | db_name | VARCHAR(65) | Database name | +| 4 | table_name | VARCHAR(193) | Table name | +| 5 | condition | VARCHAR(49152) | The privilege filter for child tables | diff --git a/docs/en/14-reference/02-rest-api/02-rest-api.mdx b/docs/en/14-reference/02-rest-api/02-rest-api.mdx index ea010f42db8b988c597bcd4c9278c0b5d50a3ca7..4da987213cb8c5f21fd7c8defe1f0952b8f65e41 100644 --- a/docs/en/14-reference/02-rest-api/02-rest-api.mdx +++ b/docs/en/14-reference/02-rest-api/02-rest-api.mdx @@ -79,6 +79,12 @@ Parameter Description: - tz: Optional parameter that specifies the timezone of the returned time, following the IANA Time Zone rules, e.g. `America/New_York`. - req_id: Optional parameter that specifies the request id for tracing. +:::note + +URL Encoding. Make sure that parameters are properly encoded. For example, when specifying a timezone you must properly encode special characters. ?tz=Etc/GMT+10 will not work because the <+> plus symbol is recognized as a space in the url. It's best practice to encode all special characters in a parameter. Instead use ?tz=Etc%2FGMT%2B10 for the parameter. + +::: + For example, `http://h1.taos.com:6041/rest/sql/test` is a URL to `h1.taos.com:6041` and sets the default database name to `test`. TDengine supports both Basic authentication and custom authentication mechanisms, and subsequent versions will provide a standard secure digital signature mechanism for authentication. diff --git a/docs/en/25-application/_03-immigrate.md b/docs/en/25-application/_03-immigrate.md index f78042353249a29f7ee634cfc544c6c0914e3251..457a40614e836e8735195b6b7e6c50268cf66662 100644 --- a/docs/en/25-application/_03-immigrate.md +++ b/docs/en/25-application/_03-immigrate.md @@ -338,7 +338,7 @@ Remark: Equivalent function: sum ```sql -Select max(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s) +Select sum(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s) ``` Note: This function has no interpolation requirements, so it can be directly calculated. diff --git a/docs/zh/12-taos-sql/22-meta.md b/docs/zh/12-taos-sql/22-meta.md index 3ab0b684ce4fcb3ca476410a1569b4d7f48a3dd1..35794ec2699eba8111096022e04632853cfc3056 100644 --- a/docs/zh/12-taos-sql/22-meta.md +++ b/docs/zh/12-taos-sql/22-meta.md @@ -98,7 +98,7 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 21 | cachesize | INT | 表示每个 vnode 中用于缓存子表最近数据的内存大小。需要注意,`cachesize` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 22 | wal_level | INT | WAL 级别。需要注意,`wal_level` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 23 | wal_fsync_period | INT | 数据落盘周期。需要注意,`wal_fsync_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 24 | wal_retention_period | INT | WAL 的保存时长。需要注意,`wal_retention_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 24 | wal_retention_period | INT | WAL 的保存时长,单位为秒。需要注意,`wal_retention_period` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 25 | wal_retention_size | INT | WAL 的保存上限。需要注意,`wal_retention_size` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 26 | stt_trigger | SMALLINT | 触发文件合并的落盘文件的个数。需要注意,`stt_trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 27 | table_prefix | SMALLINT | 内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。需要注意,`table_prefix` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | @@ -298,3 +298,13 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 7 | target_table | BINARY(192) | 流计算写入的目标表 | | 8 | watermark | BIGINT | watermark,详见 SQL 手册流式计算。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 9 | trigger | INT | 计算结果推送模式,详见 SQL 手册流式计算。需要注意,`trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | + +## INS_USER_PRIVILEGES + +| # | **列名** | **数据类型** | **说明** | +| --- | :----------: | ------------ | -------------------------------------------------------------------------------------------------------------------- | +| 1 | user_name | VARCHAR(24) | 用户名 +| 2 | privilege | VARCHAR(10) | 权限描述 +| 3 | db_name | VARCHAR(65) | 数据库名称 +| 4 | table_name | VARCHAR(193) | 表名称 +| 5 | condition | VARCHAR(49152) | 子表权限过滤条件 diff --git a/docs/zh/25-application/03-immigrate.md b/docs/zh/25-application/03-immigrate.md index 75788c0cc7d6e0e84402ba77c4a1aa875d772d8b..389a2b2c5a31f7357fafe9bf4fd178d811acf464 100644 --- a/docs/zh/25-application/03-immigrate.md +++ b/docs/zh/25-application/03-immigrate.md @@ -371,7 +371,7 @@ Select min(val) from table_name 等效函数:sum ```sql -Select max(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s) +Select sum(value) from (select first(val) value from table_name interval(10s) fill(linear)) interval(10s) ``` 备注:该函数无插值需求,因此可用直接计算。 diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 3bef15f3a7c49b7a89112344b67182b3da9f3696..f90c38f341edccf801d7f7d470228c524a8f794d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -221,13 +221,9 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); -bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo); -bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo); -int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo); +int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo); -void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo); - int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 2fbd7851e89911e61328b30bd88506d73cc486ad..773f373a2d174121a7396bed1f5827e7bb514987 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -368,6 +368,8 @@ typedef struct SStateStore { bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); void (*updateInfoDestroy)(SUpdateInfo* pInfo); + void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count); + void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 066f83fbcbb96b1df73d50982c0ba2702bc2b296..db0509d81d83f4d43a555e64fcafcff9252ea6d8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -604,15 +604,10 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); -int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); +int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); -bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); -bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); -int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask); - // common -int32_t streamSetParamForScanHistory(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); @@ -626,7 +621,6 @@ void streamTaskEnablePause(SStreamTask* pTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); -int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index bd5a3be8de638005a5e85e999d3888702903eb75..7bb1d027c9da6539708a755b52f23b87a10beea8 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -53,6 +53,8 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); +void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count); +void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count); #ifdef __cplusplus } diff --git a/include/util/tdef.h b/include/util/tdef.h index f16bb7993717dff0560e3df5cc1c6d8bf501d76e..e4af88bf10291235e07ae7b18674fa064b054683 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -369,8 +369,13 @@ typedef enum ELogicConditionType { #define TSDB_DB_SCHEMALESS_OFF 0 #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF #define TSDB_MIN_STT_TRIGGER 1 -#define TSDB_MAX_STT_TRIGGER 16 -#define TSDB_DEFAULT_SST_TRIGGER 2 +#ifdef TD_ENTERPRISE +#define TSDB_MAX_STT_TRIGGER 16 +#define TSDB_DEFAULT_SST_TRIGGER 2 +#else +#define TSDB_MAX_STT_TRIGGER 1 +#define TSDB_DEFAULT_SST_TRIGGER 1 +#endif #define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN) #define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2) #define TSDB_DEFAULT_HASH_PREFIX 0 diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 848e123448a3733d2c675bcebed82b7b261088db..3f9c5bbeaf8f7481069e252efe19ac84eeb14e41 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -42,7 +42,7 @@ static SDnode globalDnode = {0}; static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS", - "FreeBSD", "openSUSE", "SLES", "Fedora", "MacOS"}; + "FreeBSD", "openSUSE", "SLES", "Fedora", "macOS"}; SDnode *dmInstance() { return &globalDnode; } diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index c046505630251092923189eadbc532e87970e4b6..e737e3fa373ed621ec87c3267ce0cb964ae0c19e 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -78,6 +78,8 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->updateInfoIsUpdated = updateInfoIsUpdated; pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; pStore->updateInfoDestroy = updateInfoDestroy; + pStore->windowSBfDelete = windowSBfDelete; + pStore->windowSBfAdd = windowSBfAdd; pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 60071645c0f3b7009fe503424c7cf77a72d02486..082f0c222deab398105f75688923d85b8fab5cd5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1321,7 +1321,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); - qResetStreamInfoTimeWindow(pTask->exec.pExecutor); + qStreamInfoResetTimewindowFilter(pTask->exec.pExecutor); } else { // when related fill-history task exists, update the fill-history time window only when the // state transfer is completed. @@ -1592,9 +1592,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SStreamMeta* pMeta = pTq->pStreamMeta; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); if (pTask == NULL) { - tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, + tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, pReq->taskId); - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active return TSDB_CODE_SUCCESS; } @@ -1606,9 +1605,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { - tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success", + tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId, pTask->historyTaskId.taskId); - streamMetaReleaseTask(pMeta, pTask); // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active @@ -1616,14 +1614,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - streamTaskPause(pHistoryTask); - } - streamMetaReleaseTask(pMeta, pTask); - if (pHistoryTask != NULL) { + streamTaskPause(pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask); } + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } @@ -1652,7 +1648,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - streamStartRecoverTask(pTask, igUntreated); + streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { tqStartStreamTasks(pTq); } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index d5d8ba130c632b0b50caa607ffcd7084b37cde28..e1756333c536e7158b823d078bffae6e79e76f92 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1729,45 +1729,41 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader // row in last file block TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); + if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (key < tsLast) { - return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key > tsLast) { - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); - } - } else { - if (key > tsLast) { + if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key < tsLast) { - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); - } - } - // the following for key == tsLast - SRow* pTSRow = NULL; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + } else if (key == ts) { + SRow* pTSRow = NULL; + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + tsdbRowMergerAdd(pMerger, pRow1, NULL); - code = tsdbRowMergerGetRow(pMerger, &pTSRow); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr); - code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - taosMemoryFree(pTSRow); - tsdbRowMergerClear(pMerger); - return code; + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + taosMemoryFree(pTSRow); + tsdbRowMergerClear(pMerger); + return code; + } else { // key > ts + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + } + } else { // desc order + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true); + } } else { // only last block exists return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); } @@ -2194,8 +2190,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBROW *pRow = NULL, *piRow = NULL; - int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : - (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN); + int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; if (pBlockScanInfo->iter.hasVal) { pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); } @@ -2569,18 +2564,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // load the last data block of current table STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; - if (pScanInfo == NULL) { - tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr); - bool hasNexTable = moveToNextTable(pUidList, pStatus); - if (!hasNexTable) { - return TSDB_CODE_SUCCESS; - } - - continue; - } - if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { // reset the index in last block when handing a new file + // doCleanupTableScanInfo(pScanInfo); bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -2589,6 +2575,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { continue; } + // reset the index in last block when handing a new file + // doCleanupTableScanInfo(pScanInfo); + bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasDataInLastFile) { bool hasNexTable = moveToNextTable(pUidList, pStatus); @@ -2678,32 +2667,16 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); - int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN; - if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) || - (!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) { - // whole block is required, return it directly - SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; - pInfo->rows = pBlockInfo->record.numRow; - pInfo->id.uid = pScanInfo->uid; - pInfo->dataLoad = 0; - pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; - setComposedBlockFlag(pReader, false); - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); + if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { + // only return the rows in last block + int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); + ASSERT(tsLast >= pBlockInfo->record.lastKey); - // update the last key for the corresponding table - pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 - " clean file block retrieved from file, global index:%d, " - "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", - pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, - pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); - } else { SBlockData* pBData = &pReader->status.fileBlockData; tBlockDataReset(pBData); SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; - tsdbDebug("load data in last block firstly %s", pReader->idStr); + tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr); int64_t st = taosGetTimestampUs(); @@ -2734,8 +2707,23 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } - } + } else { // whole block is required, return it directly + SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; + pInfo->rows = pBlockInfo->record.numRow; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 0; + pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; + setComposedBlockFlag(pReader, false); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); + // update the last key for the corresponding table + pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; + tsdbDebug("%p uid:%" PRIu64 + " clean file block retrieved from file, global index:%d, " + "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", + pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, + pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); + } } return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; @@ -4109,6 +4097,11 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { tsdbDataFileReaderClose(&pReader->pFileReader); + int64_t loadBlocks = 0; + double elapse = 0; + pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse); + pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); + // resetDataBlockScanInfo excluding lastKey STableBlockScanInfo** p = NULL; int32_t iter = 0; @@ -4179,7 +4172,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { } } - tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false); + tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); pReader->pReadSnap = NULL; pReader->flag = READER_STATUS_SUSPEND; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 3dfaa28c092a8f8e36d942ed581fa85c7b22deb5..5c8d563d73f2d98567b1118b676eb607c5937bf9 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -180,6 +180,8 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->updateInfoIsUpdated = updateInfoIsUpdated; pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; pStore->updateInfoDestroy = updateInfoDestroy; + pStore->windowSBfDelete = windowSBfDelete; + pStore->windowSBfAdd = windowSBfAdd; pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index cdf37bcc6b5a9cd2a06f0398cd17675e2ce62531..7241b015a09321db59af5f212efae85af56959ca 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -62,8 +62,8 @@ typedef struct { SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; - bool recoverStep1Finished; - bool recoverStep2Finished; +// bool recoverStep1Finished; +// bool recoverStep2Finished; int8_t recoverScanFinished; SQueryTableDataCond tableCond; SVersionRange fillHistoryVer; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5a99c1ea9a2fd0f7634daba482cd52d3811a4a11..e4ddf9ca6c65e629031f69e505456d0f88c1de47 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -116,17 +116,6 @@ void resetTaskInfo(qTaskInfo_t tinfo) { clearStreamBlock(pTaskInfo->pRoot); } -void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; - if (pTaskInfo == NULL) { - return; - } - - qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX); - pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; - pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX; -} - static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -341,7 +330,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v return NULL; } - qResetStreamInfoTimeWindow(pTaskInfo); + qStreamInfoResetTimewindowFilter(pTaskInfo); return pTaskInfo; } @@ -891,8 +880,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1; - pStreamInfo->recoverStep1Finished = false; - pStreamInfo->recoverStep2Finished = false; qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, @@ -910,8 +897,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; - pStreamInfo->recoverStep1Finished = true; - pStreamInfo->recoverStep2Finished = false; qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, @@ -1050,23 +1035,15 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.recoverScanFinished; } -bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) { +int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return pTaskInfo->streamInfo.recoverStep1Finished; -} + STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; -bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return pTaskInfo->streamInfo.recoverStep2Finished; -} - -int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - pTaskInfo->streamInfo.recoverStep1Finished = true; - pTaskInfo->streamInfo.recoverStep2Finished = true; + qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64, + GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); - // reset the time window - pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; + pWindow->skey = INT64_MIN; + pWindow->ekey = INT64_MAX; return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7434db61db28dc6adc2ecaeb88ce9bbc719fb79a..da4bd1e23cfcbb8fe90afc24137d7aa03922098a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1590,38 +1590,51 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW } // re-build the delete block, ONLY according to the split timestamp -static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { - if (skey == INT64_MIN) { - return; - } - +static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) { int32_t numOfRows = pBlock->info.rows; - - bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); - bool hasUnqualified = false; + bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); + bool hasUnqualified = false; + int64_t skey = pWindow->skey; + int64_t ekey = pWindow->ekey; SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; - for (int32_t i = 0; i < numOfRows; i++) { - if (tsStartCol[i] < skey) { - tsStartCol[i] = skey; + if (pWindow->skey != INT64_MIN) { + for (int32_t i = 0; i < numOfRows; i++) { + if (tsStartCol[i] < skey) { + tsStartCol[i] = skey; + } + + if (tsEndCol[i] >= skey) { + p[i] = true; + } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] + hasUnqualified = true; + } } + } else if (pWindow->ekey != INT64_MAX) { + for(int32_t i = 0; i < numOfRows; ++i) { + if (tsEndCol[i] > ekey) { + tsEndCol[i] = ekey; + } - if (tsEndCol[i] >= skey) { - p[i] = true; - } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] - hasUnqualified = true; + if (tsStartCol[i] <= ekey) { + p[i] = true; + } else { + hasUnqualified = true; + } } } if (hasUnqualified) { trimDataBlock(pBlock, pBlock->info.rows, p); + qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); + } else { + qDebug("%s not update the delete block", id); } - qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); taosMemoryFree(p); } @@ -2030,7 +2043,7 @@ FETCH_NEXT_BLOCK: } setBlockGroupIdByUid(pInfo, pDelBlock); - rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id); + rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id); printDataBlock(pDelBlock, "stream scan delete recv filtered"); if (pDelBlock->info.rows == 0) { if (pInfo->tqReader) { @@ -2411,7 +2424,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) { pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->pUpdateInfo = pUpInfo; } else { - pInfo->pUpdateInfo->minTS = TMAX(pInfo->pUpdateInfo->minTS, pUpInfo->minTS); + pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1); + pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1); + ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS); pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion); SHashObj* curMap = pInfo->pUpdateInfo->pMap; void *pIte = taosHashIterate(curMap, NULL); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index add893c8c7dfa7d04c8bef8dfbce4df204357d23..32d6dc65d93ab1ddf1a6e383645652b70110f351 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -62,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); +int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4ef7d6084d9a0da77c6e58254fb2c1d3e00e6eb6..d1dff0f2e730a6329cd32e4bf8906f6ac1192dc8 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -163,15 +163,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { - int32_t code = 0; - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - void* exec = pTask->exec.pExecutor; + int32_t code = TSDB_CODE_SUCCESS; + void* exec = pTask->exec.pExecutor; + bool finished = false; qSetStreamOpOpen(exec); - bool finished = false; - while (1) { + while (!finished) { if (streamTaskShouldPause(&pTask->status)) { double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); @@ -184,44 +183,30 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { return -1; } - int32_t batchCnt = 0; + int32_t numOfBlocks = 0; while (1) { if (streamTaskShouldStop(&pTask->status)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } + if (streamTaskShouldPause(&pTask->status)) { + break; + } + SSDataBlock* output = NULL; uint64_t ts = 0; if (qExecTask(exec, &output, &ts) < 0) { continue; } - if (output == NULL) { - if (qStreamRecoverScanFinished(exec)) { - finished = true; - } else { - qSetStreamOpOpen(exec); - if (streamTaskShouldPause(&pTask->status)) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - code = streamTaskOutputResultBlock(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return code; - } - return 0; - } - } + if (output == NULL && qStreamRecoverScanFinished(exec)) { + finished = true; break; + } else { + if (output == NULL) { + ASSERT(0); + } } SSDataBlock block = {0}; @@ -229,86 +214,37 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); - batchCnt++; - - qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz); - if (batchCnt >= batchSz) { + numOfBlocks++; + qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz); + if (numOfBlocks >= batchSz) { break; } } - if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - - if (finished) { - qDebug("s-task:%s finish recover exec task ", pTask->id.idStr); - break; - } else { - qDebug("s-task:%s continue recover exec task ", pTask->id.idStr); - continue; + if (taosArrayGetSize(pRes) > 0) { + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); + if (qRes == NULL) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - } - - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - code = streamTaskOutputResultBlock(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return code; - } - - if (finished) { - break; - } - } - return 0; -} + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; -#if 0 -int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { - // fetch all queue item, merge according to batchLimit - int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall); - if (numOfItems == 0) { - qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId); - return 0; - } - SStreamQueueItem* pMerged = NULL; - SStreamQueueItem* pItem = NULL; - taosGetQitem(pTask->inputQall, (void**)&pItem); - if (pItem == NULL) { - if (pMerged != NULL) { - // process merged item + code = streamTaskOutputResultBlock(pTask, qRes); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); + return code; + } } else { - return 0; + taosArrayDestroy(pRes); } } - // if drop - if (pItem->type == STREAM_INPUT__DESTROY) { - // set status drop - return -1; - } - - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem); - } - - // exec impl - - // output - // try dispatch return 0; } -#endif int32_t updateCheckPointInfo(SStreamTask* pTask) { int64_t ckId = 0; @@ -356,12 +292,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } } -static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { +static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - // todo: destroy this task here + // todo: destroy the fill-history task here qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -402,34 +338,36 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); } - // expand the query time window for stream scanner + // 1. expand the query time window for stream task of WAL scanner pTimeWindow->skey = INT64_MIN; - qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor); + qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); - // transfer the ownership of executor state + // 2. transfer the ownership of executor state streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // clear the link between fill-history task and stream task info + // 3. clear the link between fill-history task and stream task info pStreamTask->historyTaskId.taskId = 0; + + // 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be + // pause, since the pause allowed attribute is not set yet. streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); int32_t taskId = pTask->id.taskId; - // free it and remove it from disk meta-store + // 5. free it and remove fill-history task from disk meta-store streamMetaUnregisterTask(pMeta, taskId); - // save to disk + // 6. save to disk taosWLockLatch(&pMeta->lock); - streamMetaSaveTask(pMeta, pStreamTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk } taosWUnLockLatch(&pMeta->lock); - // pause allowed + // 7. pause allowed. streamTaskEnablePause(pStreamTask); streamSchedExec(pStreamTask); @@ -437,6 +375,26 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { + int32_t code = TSDB_CODE_SUCCESS; + if (!pTask->status.transferState) { + return code; + } + + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SOURCE) { + streamTaskFillHistoryFinished(pTask); + streamTaskEndScanWAL(pTask); + } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. + code = streamDoTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return code; + } + } + + return code; +} + static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id) { int32_t retryTimes = 0; @@ -590,17 +548,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); - // 3. notify downstream tasks to transfer executor state after handle all history blocks. - pTask->status.transferState = true; - + // 1. notify all downstream tasks to transfer executor state after handle all history blocks. int32_t code = streamDispatchTransferStateMsg(pTask); if (code != TSDB_CODE_SUCCESS) { // todo handle error } - // the last execution of fill-history task, in order to transfer task operator states. - code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this + // 2. do transfer stream task operator states. + pTask->status.transferState = true; + code = streamDoTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle error return code; } @@ -624,9 +581,11 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here if (taosQueueEmpty(pTask->inputQueue->queue)) { // fill-history WAL scan has completed - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) { - streamTaskRecoverSetAllStepFinished(pTask); - streamTaskEndScanWAL(pTask); + if (pTask->status.transferState) { + code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index df45ff2759e512b799b2f8b1203e47d3823573f5..bd2d67e14ae3121ef2f4bb1e63d22c83da6e5cc1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -17,23 +17,30 @@ #include "ttimer.h" #include "wal.h" -static void launchFillHistoryTask(SStreamTask* pTask); -static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); +typedef struct SStreamTaskRetryInfo { + SStreamMeta* pMeta; + int32_t taskId; +} SStreamTaskRetryInfo; -static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { +static int32_t streamSetParamForScanHistory(SStreamTask* pTask); +static void launchFillHistoryTask(SStreamTask* pTask); +static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); +static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); + +static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init); + int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init); qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); } -int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { +int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { SStreamScanHistoryReq req; - streamBuildSourceRecover1Req(pTask, &req, igUntreated); - int32_t len = sizeof(SStreamScanHistoryReq); + initScanHistoryReq(pTask, &req, igUntreated); + int32_t len = sizeof(SStreamScanHistoryReq); void* serializedReq = rpcMallocCont(len); if (serializedReq == NULL) { return -1; @@ -65,9 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); } - streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); - int32_t code = streamStartRecoverTask(pTask, 0); + streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); + int32_t code = streamStartScanHistoryAsync(pTask, 0); return code; } @@ -142,7 +149,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { } else { qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskSetForReady(pTask, 0); + streamTaskSetReady(pTask, 0); streamTaskSetRangeStreamCalc(pTask); streamTaskLaunchScanHistory(pTask); @@ -188,7 +195,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { } static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { - streamTaskSetForReady(pTask, numOfReqs); + streamTaskSetReady(pTask, numOfReqs); const char* id = pTask->id.idStr; int8_t status = pTask->status.taskStatus; @@ -319,7 +326,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *p return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); } -int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { +int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { pReq->msgHead.vgId = pTask->info.nodeId; pReq->streamId = pTask->id.streamId; pReq->taskId = pTask->id.taskId; @@ -524,11 +531,6 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { streamTaskDoCheckDownstreamTasks(pHTask); } -typedef struct SStreamTaskRetryInfo { - SStreamMeta* pMeta; - int32_t taskId; -} SStreamTaskRetryInfo; - static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTaskRetryInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; @@ -638,7 +640,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { } } - // dispatch recover finish req to all related downstream task + // dispatch scan-history finish req to all related downstream task code = streamDispatchScanHistoryFinishMsg(pTask); if (code < 0) { return -1; @@ -647,19 +649,9 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { return 0; } -bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - return qStreamRecoverScanStep1Finished(exec); -} - -bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) { +int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; - return qStreamRecoverScanStep2Finished(exec); -} - -int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - return qStreamRecoverSetAllStepFinished(exec); + return qStreamInfoResetTimewindowFilter(exec); } bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { @@ -669,7 +661,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { int64_t nextStartVer = pRange->maxVer + 1; if (nextStartVer > latestVer - 1) { // no input data yet. no need to execute the secondardy scan while stream task halt - streamTaskRecoverSetAllStepFinished(pTask); + streamTaskFillHistoryFinished(pTask); qDebug( "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " "related stream task currentVer:%" PRId64, @@ -684,7 +676,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { } } - int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; @@ -857,7 +848,7 @@ void streamTaskPause(SStreamTask* pTask) { taosMsleep(100); } - // todo: use the lock of the task. + // todo: use the task lock, stead of meta lock taosWLockLatch(&pMeta->lock); status = pTask->status.taskStatus; @@ -871,6 +862,12 @@ void streamTaskPause(SStreamTask* pTask) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); taosWUnLockLatch(&pMeta->lock); + // in case of fill-history task, stop the tsdb file scan operation. + if (pTask->info.fillHistory == 1) { + void* pExecutor = pTask->exec.pExecutor; + qKillTask(pExecutor, TSDB_CODE_SUCCESS); + } + int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 85be120dbd562f2ce6526b391c937a362396b569..7a8de91d7735fb7b43a8fb65e747ff5aa7737723 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -33,7 +33,7 @@ static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } -static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { +void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { if (pInfo->numSBFs < count) { count = pInfo->numSBFs; } @@ -49,7 +49,7 @@ static void clearItemHelper(void *p) { tScalableBfDestroy(*pBf); } -static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { +void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { if (count < pInfo->numSBFs) { for (uint64_t i = 0; i < count; ++i) { SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0); diff --git a/tests/script/sh/stop_dnodes.bat b/tests/script/sh/stop_dnodes.bat index ab7af2ca92023745b8b712cb78e9b168a6c00598..65aee26ed4f575ce446ba02055713f69f6082191 100644 --- a/tests/script/sh/stop_dnodes.bat +++ b/tests/script/sh/stop_dnodes.bat @@ -3,4 +3,6 @@ rem echo taskkill /F /IM taosd.exe wmic process where "name='taosd.exe'" call terminate > NUL 2>&1 -taskkill /F /IM taosd.exe > NUL 2>&1 \ No newline at end of file +taskkill /F /IM taosd.exe > NUL 2>&1 + +rem echo taskkill /F /IM taosd.exe finished \ No newline at end of file diff --git a/tools/shell/src/shellUtil.c b/tools/shell/src/shellUtil.c index e15b49efcc35da2682d003243c0a19eb278acbc7..93451c85a9a34545a6aa86e3777d92d462f8849d 100644 --- a/tools/shell/src/shellUtil.c +++ b/tools/shell/src/shellUtil.c @@ -68,7 +68,7 @@ int32_t shellCheckIntSize() { return 0; } -void shellPrintVersion() { printf("version: %s\r\n", version); } +void shellPrintVersion() { printf("%s\r\n", shell.info.programVersion); } void shellGenerateAuth() { char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; diff --git a/utils/tsim/CMakeLists.txt b/utils/tsim/CMakeLists.txt index 81737809d900a8931be71b4b7c605c9f18627d32..c2cf7ac3c5380c4e116e96753fa78f486d918566 100644 --- a/utils/tsim/CMakeLists.txt +++ b/utils/tsim/CMakeLists.txt @@ -2,7 +2,7 @@ aux_source_directory(src TSIM_SRC) add_executable(tsim ${TSIM_SRC}) target_link_libraries( tsim - PUBLIC taos + PUBLIC taos_static PUBLIC util PUBLIC common PUBLIC os