提交 8cf5dbe4 编写于 作者: J jiajingbin

Merge branch 'main' of https://github.com/taosdata/TDengine into main

...@@ -178,6 +178,7 @@ The following list shows all reserved keywords: ...@@ -178,6 +178,7 @@ The following list shows all reserved keywords:
- MATCH - MATCH
- MAX_DELAY - MAX_DELAY
- MAX_SPEED
- MAXROWS - MAXROWS
- MERGE - MERGE
- META - META
......
...@@ -82,7 +82,7 @@ INSERT INTO d1001 (ts, current, phase) VALUES ('2021-07-13 14:06:33.196', 10.27, ...@@ -82,7 +82,7 @@ INSERT INTO d1001 (ts, current, phase) VALUES ('2021-07-13 14:06:33.196', 10.27,
```sql ```sql
INSERT INTO d1001 VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33) INSERT INTO d1001 VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33)
d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31; d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31);
``` ```
## 插入记录时自动建表 ## 插入记录时自动建表
......
...@@ -178,6 +178,7 @@ description: TDengine 保留关键字的详细列表 ...@@ -178,6 +178,7 @@ description: TDengine 保留关键字的详细列表
- MATCH - MATCH
- MAX_DELAY - MAX_DELAY
- MAX_SPEED
- MAXROWS - MAXROWS
- MERGE - MERGE
- META - META
......
...@@ -95,31 +95,12 @@ taos -C ...@@ -95,31 +95,12 @@ taos -C
### maxShellConns ### maxShellConns
| 属性 | 说明 | | 属性 | 说明 |
| --------| ----------------------- | | -------- | ----------------------- |
| 适用范围 | 仅服务端适用 | | 适用范围 | 仅服务端适用 |
| 含义 | 一个 dnode 容许的连接数 | | 含义 | 一个 dnode 容许的连接数 |
| 取值范围 | 10-50000000 | | 取值范围 | 10-50000000 |
| 缺省值 | 5000 | | 缺省值 | 5000 |
### numOfRpcSessions
| 属性 | 说明 |
| --------| ---------------------- |
| 适用范围 | 客户端和服务端都适用 |
| 含义 | 一个客户端能创建的最大连接数|
| 取值范围 | 100-100000 |
| 缺省值 | 10000 |
### timeToGetAvailableConn
| 属性 | 说明 |
| -------- | --------------------|
| 适用范围 | 客户端和服务端都适用 |
| 含义 |获得可用连接的最长等待时间|
| 取值范围 | 10-50000000(单位为毫秒)|
| 缺省值 | 500000 |
### numOfRpcSessions ### numOfRpcSessions
| 属性 | 说明 | | 属性 | 说明 |
...@@ -127,7 +108,7 @@ taos -C ...@@ -127,7 +108,7 @@ taos -C
| 适用范围 | 客户端和服务端都适用 | | 适用范围 | 客户端和服务端都适用 |
| 含义 | 一个客户端能创建的最大连接数 | | 含义 | 一个客户端能创建的最大连接数 |
| 取值范围 | 100-100000 | | 取值范围 | 100-100000 |
| 缺省值 | 10000 | | 缺省值 | 30000 |
### timeToGetAvailableConn ### timeToGetAvailableConn
...@@ -393,7 +374,7 @@ charset 的有效值是 UTF-8。 ...@@ -393,7 +374,7 @@ charset 的有效值是 UTF-8。
### metaCacheMaxSize ### metaCacheMaxSize
| 属性 | 说明 | | 属性 | 说明 |
| -------- | ---------------------------------------------- | | -------- | ------------------------------------ |
| 适用范围 | 仅客户端适用 | | 适用范围 | 仅客户端适用 |
| 含义 | 指定单个客户端元数据缓存大小的最大值 | | 含义 | 指定单个客户端元数据缓存大小的最大值 |
| 单位 | MB | | 单位 | MB |
...@@ -480,7 +461,7 @@ charset 的有效值是 UTF-8。 ...@@ -480,7 +461,7 @@ charset 的有效值是 UTF-8。
### slowLogScope ### slowLogScope
| 属性 | 说明 | | 属性 | 说明 |
| -------- | --------------------------------------------------------------| | -------- | ---------------------------------------------------------- |
| 适用范围 | 仅客户端适用 | | 适用范围 | 仅客户端适用 |
| 含义 | 指定启动记录哪些类型的慢查询 | | 含义 | 指定启动记录哪些类型的慢查询 |
| 可选值 | ALL, QUERY, INSERT, OTHERS, NONE | | 可选值 | ALL, QUERY, INSERT, OTHERS, NONE |
...@@ -685,12 +666,12 @@ charset 的有效值是 UTF-8。 ...@@ -685,12 +666,12 @@ charset 的有效值是 UTF-8。
| 适用范围 | 仅客户端适用 | | 适用范围 | 仅客户端适用 |
| 含义 | schemaless 列数据是否顺序一致,从3.0.3.0开始,该配置废弃 | | 含义 | schemaless 列数据是否顺序一致,从3.0.3.0开始,该配置废弃 |
| 值域 | 0:不一致;1: 一致 | | 值域 | 0:不一致;1: 一致 |
| 缺省值 | 0 | 缺省值 | 0 |
### smlTsDefaultName ### smlTsDefaultName
| 属性 | 说明 | | 属性 | 说明 |
| -------- | -------------------------------------------------------- | | -------- | -------------------------------------------- |
| 适用范围 | 仅客户端适用 | | 适用范围 | 仅客户端适用 |
| 含义 | schemaless自动建表的时间列名字通过该配置设置 | | 含义 | schemaless自动建表的时间列名字通过该配置设置 |
| 类型 | 字符串 | | 类型 | 字符串 |
...@@ -729,7 +710,7 @@ charset 的有效值是 UTF-8。 ...@@ -729,7 +710,7 @@ charset 的有效值是 UTF-8。
### ttlChangeOnWrite ### ttlChangeOnWrite
| 属性 | 说明 | | 属性 | 说明 |
| -------- | ------------------ | | -------- | ------------------------------------ |
| 适用范围 | 仅服务端适用 | | 适用范围 | 仅服务端适用 |
| 含义 | ttl 到期时间是否伴随表的修改操作改变 | | 含义 | ttl 到期时间是否伴随表的修改操作改变 |
| 取值范围 | 0: 不改变;1:改变 | | 取值范围 | 0: 不改变;1:改变 |
...@@ -738,7 +719,7 @@ charset 的有效值是 UTF-8。 ...@@ -738,7 +719,7 @@ charset 的有效值是 UTF-8。
### keepTimeOffset ### keepTimeOffset
| 属性 | 说明 | | 属性 | 说明 |
| -------- | ------------------ | | -------- | -------------- |
| 适用范围 | 仅服务端适用 | | 适用范围 | 仅服务端适用 |
| 含义 | 迁移操作的延时 | | 含义 | 迁移操作的延时 |
| 单位 | 小时 | | 单位 | 小时 |
...@@ -748,10 +729,10 @@ charset 的有效值是 UTF-8。 ...@@ -748,10 +729,10 @@ charset 的有效值是 UTF-8。
### tmqMaxTopicNum ### tmqMaxTopicNum
| 属性 | 说明 | | 属性 | 说明 |
| -------- | ------------------ | | -------- | --------------------------- |
| 适用范围 | 仅服务端适用 | | 适用范围 | 仅服务端适用 |
| 含义 | 订阅最多可建立的 topic 数量 | | 含义 | 订阅最多可建立的 topic 数量 |
| 取值范围 | 1-10000| | 取值范围 | 1-10000 |
| 缺省值 | 20 | | 缺省值 | 20 |
## 压缩参数 ## 压缩参数
......
...@@ -2767,6 +2767,7 @@ typedef struct { ...@@ -2767,6 +2767,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t leftForVer; int64_t leftForVer;
int64_t streamId;
int32_t taskId; int32_t taskId;
} SVDropStreamTaskReq; } SVDropStreamTaskReq;
...@@ -2958,6 +2959,7 @@ int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset); ...@@ -2958,6 +2959,7 @@ int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t streamId;
int32_t taskId; int32_t taskId;
} SVPauseStreamTaskReq; } SVPauseStreamTaskReq;
...@@ -2976,6 +2978,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq ...@@ -2976,6 +2978,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int32_t taskId; int32_t taskId;
int64_t streamId;
int8_t igUntreated; int8_t igUntreated;
} SVResumeStreamTaskReq; } SVResumeStreamTaskReq;
......
...@@ -74,7 +74,7 @@ typedef enum { ...@@ -74,7 +74,7 @@ typedef enum {
* @param vgId * @param vgId
* @return * @return
*/ */
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId); qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId);
/** /**
* Create the exec task for queue mode * Create the exec task for queue mode
...@@ -95,8 +95,6 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList ...@@ -95,8 +95,6 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList
*/ */
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor // todo refactor
......
...@@ -644,9 +644,9 @@ void streamMetaClose(SStreamMeta* streamMeta); ...@@ -644,9 +644,9 @@ void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
......
...@@ -89,7 +89,7 @@ typedef struct SRpcInit { ...@@ -89,7 +89,7 @@ typedef struct SRpcInit {
int32_t retryMinInterval; // retry init interval int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval
int64_t retryMaxTimouet; int64_t retryMaxTimeout;
int32_t failFastThreshold; int32_t failFastThreshold;
int32_t failFastInterval; int32_t failFastInterval;
......
...@@ -123,8 +123,8 @@ function clean_bin() { ...@@ -123,8 +123,8 @@ function clean_bin() {
${csudo}rm -f ${bin_link_dir}/set_core || : ${csudo}rm -f ${bin_link_dir}/set_core || :
${csudo}rm -f ${bin_link_dir}/TDinsight.sh || : ${csudo}rm -f ${bin_link_dir}/TDinsight.sh || :
${csudo}rm -f ${bin_link_dir}/${keeperName2} || : ${csudo}rm -f ${bin_link_dir}/${keeperName2} || :
${csudo}rm -f ${bin_link_dir}/${xName2} || : # ${csudo}rm -f ${bin_link_dir}/${xName2} || :
${csudo}rm -f ${bin_link_dir}/${explorerName2} || : # ${csudo}rm -f ${bin_link_dir}/${explorerName2} || :
if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then
${csudo}rm -f ${bin_link_dir}/${clientName2} || : ${csudo}rm -f ${bin_link_dir}/${clientName2} || :
...@@ -194,26 +194,26 @@ function clean_service_on_systemd() { ...@@ -194,26 +194,26 @@ function clean_service_on_systemd() {
fi fi
${csudo}systemctl disable ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null ${csudo}systemctl disable ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null
x_service_config="${service_config_dir}/${xName2}.service" # x_service_config="${service_config_dir}/${xName2}.service"
if [ -e "$x_service_config" ]; then # if [ -e "$x_service_config" ]; then
if systemctl is-active --quiet ${xName2}; then # if systemctl is-active --quiet ${xName2}; then
echo "${productName2} ${xName2} is running, stopping it..." # echo "${productName2} ${xName2} is running, stopping it..."
${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null # ${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null
fi # fi
${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null # ${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${x_service_config} # ${csudo}rm -f ${x_service_config}
fi # fi
explorer_service_config="${service_config_dir}/${explorerName2}.service" # explorer_service_config="${service_config_dir}/${explorerName2}.service"
if [ -e "$explorer_service_config" ]; then # if [ -e "$explorer_service_config" ]; then
if systemctl is-active --quiet ${explorerName2}; then # if systemctl is-active --quiet ${explorerName2}; then
echo "${productName2} ${explorerName2} is running, stopping it..." # echo "${productName2} ${explorerName2} is running, stopping it..."
${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null # ${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null
fi # fi
${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null # ${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${explorer_service_config} # ${csudo}rm -f ${explorer_service_config}
${csudo}rm -f /etc/${clientName2}/explorer.toml # ${csudo}rm -f /etc/${clientName2}/explorer.toml
fi # fi
} }
function clean_service_on_sysvinit() { function clean_service_on_sysvinit() {
......
...@@ -169,7 +169,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -169,7 +169,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMAX(connLimitNum, 10);
......
...@@ -47,7 +47,7 @@ bool tsPrintAuth = false; ...@@ -47,7 +47,7 @@ bool tsPrintAuth = false;
// queue & threads // queue & threads
int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 10000; int32_t tsNumOfRpcSessions = 30000;
int32_t tsTimeToGetAvailableConn = 500000; int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60; int32_t tsKeepAliveIdle = 60;
...@@ -1281,9 +1281,9 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { ...@@ -1281,9 +1281,9 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; // tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
// } else if (strcasecmp("smlBatchSize", name) == 0) { // } else if (strcasecmp("smlBatchSize", name) == 0) {
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; // tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
} else if(strcasecmp("smlTsDefaultName", name) == 0) { } else if (strcasecmp("smlTsDefaultName", name) == 0) {
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN); tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
} else if(strcasecmp("smlDot2Underline", name) == 0) { } else if (strcasecmp("smlDot2Underline", name) == 0) {
tsSmlDot2Underline = cfgGetItem(pCfg, "smlDot2Underline")->bval; tsSmlDot2Underline = cfgGetItem(pCfg, "smlDot2Underline")->bval;
} else if (strcasecmp("shellActivityTimer", name) == 0) { } else if (strcasecmp("shellActivityTimer", name) == 0) {
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
......
...@@ -299,7 +299,7 @@ int32_t dmInitClient(SDnode *pDnode) { ...@@ -299,7 +299,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 5000; // interval threshold(ms) rpcInit.failFastInterval = 5000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold rpcInit.failFastThreshold = 3; // failed threshold
......
...@@ -232,7 +232,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea ...@@ -232,7 +232,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
int32_t fillHistory) { int32_t fillHistory) {
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid;
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList);
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -335,8 +336,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { ...@@ -335,8 +336,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
(*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
(*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
mDebug("s-task:0x%x related history task:0x%x, level:%d", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId, mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
(*pHTask)->info.taskLevel); (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
} }
} }
......
...@@ -649,6 +649,8 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { ...@@ -649,6 +649,8 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq; action.pCont = pReq;
...@@ -1361,6 +1363,8 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { ...@@ -1361,6 +1363,8 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq; action.pCont = pReq;
...@@ -1501,7 +1505,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig ...@@ -1501,7 +1505,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated; pReq->igUntreated = igUntreated;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq; action.pCont = pReq;
......
...@@ -35,9 +35,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -35,9 +35,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.taskId; SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
...@@ -88,7 +86,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -88,7 +86,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
initStreamStateAPI(&handle.api); initStreamStateAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId);
ASSERT(pTask->exec.pExecutor); ASSERT(pTask->exec.pExecutor);
taosThreadMutexInit(&pTask->lock, NULL); taosThreadMutexInit(&pTask->lock, NULL);
...@@ -181,21 +179,21 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { ...@@ -181,21 +179,21 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
return 0; return 0;
} }
streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId); streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0; return 0;
} }
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTaskRunReq *pReq = pMsg->pCont; SStreamTaskRunReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
if (pTask) { if (pTask) {
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
...@@ -213,9 +211,8 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { ...@@ -213,9 +211,8 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamProcessDispatchMsg(pTask, &req, &rsp, exec);
...@@ -235,8 +232,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -235,8 +232,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.dstTaskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { .info = pMsg->info, .code = 0}; SRpcMsg rsp = { .info = pMsg->info, .code = 0};
...@@ -251,8 +247,11 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -251,8 +247,11 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); int32_t taskId = htonl(pRsp->upstreamTaskId);
int64_t streamId = htobe64(pRsp->streamId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, streamId, taskId);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
...@@ -260,7 +259,6 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -260,7 +259,6 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
} else { } else {
return -1; return -1;
} }
return 0;
} }
int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
...@@ -297,7 +295,7 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) ...@@ -297,7 +295,7 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg)
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
...@@ -340,7 +338,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -340,7 +338,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
.upstreamTaskId = req.upstreamTaskId, .upstreamTaskId = req.upstreamTaskId,
}; };
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
...@@ -400,7 +398,7 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { ...@@ -400,7 +398,7 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pSnode->pMeta->vgId); pSnode->pMeta->vgId);
......
...@@ -267,7 +267,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -267,7 +267,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState}; SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) { if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -956,7 +956,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -956,7 +956,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.winRange = pTask->dataRange.window}; .winRange = pTask->dataRange.window};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) { if (pTask->exec.pExecutor == NULL) {
return -1; return -1;
} }
...@@ -983,7 +983,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -983,7 +983,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.winRange = pTask->dataRange.window}; .winRange = pTask->dataRange.window};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) { if (pTask->exec.pExecutor == NULL) {
return -1; return -1;
} }
...@@ -1062,7 +1062,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1062,7 +1062,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
.upstreamTaskId = req.upstreamTaskId, .upstreamTaskId = req.upstreamTaskId,
}; };
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
...@@ -1072,8 +1072,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1072,8 +1072,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); ") from task:0x%x (vgId:%d), rsp status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} }
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
...@@ -1099,7 +1100,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { ...@@ -1099,7 +1100,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pTq->pStreamMeta->vgId); pTq->pStreamMeta->vgId);
...@@ -1149,32 +1150,27 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1149,32 +1150,27 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWLockLatch(&pStreamMeta->lock); taosWLockLatch(&pStreamMeta->lock);
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
taosWUnLockLatch(&pStreamMeta->lock);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks); tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
taosWUnLockLatch(&pStreamMeta->lock);
return -1; return -1;
} }
// not added into meta store // not added into meta store
if (!added) { if (added) {
tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
pTask = NULL;
}
taosWUnLockLatch(&pStreamMeta->lock);
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, pTask->id.streamId, taskId);
// 3. It's an fill history task, do nothing. wait for the main task to start it
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
if (p != NULL) { // reset the downstreamReady flag. if (p != NULL) { // reset the downstreamReady flag.
streamTaskCheckDownstreamTasks(p); streamTaskCheckDownstreamTasks(p);
} }
streamMetaReleaseTask(pStreamMeta, p); streamMetaReleaseTask(pStreamMeta, p);
} else {
tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
}
return 0; return 0;
} }
...@@ -1183,7 +1179,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1183,7 +1179,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
pMeta->vgId, pReq->taskId); pMeta->vgId, pReq->taskId);
...@@ -1239,7 +1235,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1239,7 +1235,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
bool done = false; bool done = false;
// 1. get the related stream task // 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo delete this task, if the related stream task is dropped // todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
...@@ -1247,7 +1243,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1247,7 +1243,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s fill-history task set status to be dropping", id); tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaUnregisterTask(pMeta, pTask->id.taskId); streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return -1;
} }
...@@ -1355,7 +1351,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1355,7 +1351,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
return -1; return -1;
...@@ -1391,7 +1387,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1391,7 +1387,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
pTq->pStreamMeta->vgId, req.downstreamTaskId); pTq->pStreamMeta->vgId, req.downstreamTaskId);
...@@ -1417,7 +1413,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1417,7 +1413,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
tDecodeCompleteHistoryDataMsg(&decoder, &req); tDecodeCompleteHistoryDataMsg(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
pTq->pStreamMeta->vgId, req.upstreamTaskId); pTq->pStreamMeta->vgId, req.upstreamTaskId);
...@@ -1508,7 +1504,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1508,7 +1504,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed // even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus; int8_t st = pTask->status.taskStatus;
...@@ -1543,7 +1539,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -1543,7 +1539,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamProcessDispatchMsg(pTask, &req, &rsp, exec);
...@@ -1557,10 +1553,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -1557,10 +1553,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
int32_t vgId = pTq->pStreamMeta->vgId; int32_t vgId = pTq->pStreamMeta->vgId;
int32_t taskId = htonl(pRsp->upstreamTaskId);
int64_t streamId = htobe64(pRsp->streamId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, streamId, taskId);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
...@@ -1574,13 +1572,13 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1574,13 +1572,13 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId); tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
return 0; return 0;
} }
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId); streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
...@@ -1589,7 +1587,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1589,7 +1587,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId); pReq->taskId);
...@@ -1602,7 +1600,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1602,7 +1600,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SStreamTask* pHistoryTask = NULL; SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) { if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId); pMeta->vgId, pTask->historyTaskId.taskId);
...@@ -1661,13 +1659,13 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, ...@@ -1661,13 +1659,13 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
if (code != 0) { if (code != 0) {
return code; return code;
} }
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId); SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask) { if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
} }
...@@ -1686,8 +1684,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1686,8 +1684,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
...@@ -1725,7 +1722,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { ...@@ -1725,7 +1722,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
if (pTask != NULL) { if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, false); streamProcessDispatchMsg(pTask, &req, &rsp, false);
......
...@@ -72,8 +72,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { ...@@ -72,8 +72,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskList, i); SStreamId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) { if (pTask == NULL) {
continue; continue;
} }
...@@ -242,8 +242,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -242,8 +242,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
numOfTasks = taosArrayGetSize(pTaskList); numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskList, i); SStreamId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) { if (pTask == NULL) {
continue; continue;
} }
......
...@@ -439,7 +439,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void ...@@ -439,7 +439,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
return code; return code;
_end: _end:
tsdbReaderClose(pReader); tsdbReaderClose2(pReader);
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
...@@ -1731,12 +1731,21 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1731,12 +1731,21 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
// row in last file block // row in last file block
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist if (key < tsLast) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key > tsLast) {
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
} else {
if (key > tsLast) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) { } else if (key < tsLast) {
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
}
// the following for key == tsLast
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1748,7 +1757,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1748,7 +1757,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL); tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1760,12 +1769,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1760,12 +1769,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger); tsdbRowMergerClear(pMerger);
return code; return code;
} else { // key > ts
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
} else { // desc order
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
}
} else { // only last block exists } else { // only last block exists
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
} }
...@@ -2192,7 +2196,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2192,7 +2196,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW *pRow = NULL, *piRow = NULL; TSDBROW *pRow = NULL, *piRow = NULL;
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] :
(ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
} }
...@@ -2566,9 +2571,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2566,9 +2571,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { if (pScanInfo == NULL) {
// reset the index in last block when handing a new file tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
// doCleanupTableScanInfo(pScanInfo);
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2577,8 +2581,15 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2577,8 +2581,15 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
continue; continue;
} }
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
continue;
}
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) { if (!hasDataInLastFile) {
...@@ -2669,16 +2680,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2669,16 +2680,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else { } else {
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
// only return the rows in last block int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN;
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) ||
ASSERT(tsLast >= pBlockInfo->record.lastKey); (!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) {
// whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
} else {
SBlockData* pBData = &pReader->status.fileBlockData; SBlockData* pBData = &pReader->status.fileBlockData;
tBlockDataReset(pBData); tBlockDataReset(pBData);
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr); tsdbDebug("load data in last block firstly %s", pReader->idStr);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -2709,23 +2736,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2709,23 +2736,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr); pResBlock->info.rows, el, pReader->idStr);
} }
} else { // whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
} }
} }
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
...@@ -4098,12 +4110,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { ...@@ -4098,12 +4110,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
} }
tsdbDataFileReaderClose(&pReader->pFileReader); tsdbDataFileReaderClose(&pReader->pFileReader);
int64_t loadBlocks = 0; int64_t loadBlocks = 0;
double elapse = 0; double elapse = 0;
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse); pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
// resetDataBlockScanInfo excluding lastKey // resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
int32_t iter = 0; int32_t iter = 0;
......
...@@ -304,7 +304,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 ...@@ -304,7 +304,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
return pTaskInfo; return pTaskInfo;
} }
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) { qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) {
if (msg == NULL) { if (msg == NULL) {
return NULL; return NULL;
} }
...@@ -317,7 +317,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v ...@@ -317,7 +317,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan); nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo); qDestroyTask(pTaskInfo);
......
...@@ -290,7 +290,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { ...@@ -290,7 +290,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo: destroy the fill-history task here // todo: destroy the fill-history task here
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
...@@ -350,10 +350,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -350,10 +350,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
int32_t taskId = pTask->id.taskId;
// 5. free it and remove fill-history task from disk meta-store // 5. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, taskId); streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// 6. save to disk // 6. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
......
...@@ -66,14 +66,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -66,14 +66,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) { if (pMeta->pTasks == NULL) {
goto _err; goto _err;
} }
// task list // task list
pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t)); pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamId));
if (pMeta->pTaskList == NULL) { if (pMeta->pTaskList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -161,43 +161,6 @@ void streamMetaClose(SStreamMeta* pMeta) { ...@@ -161,43 +161,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
} }
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeStreamTask(&decoder, pTask) < 0) {
tDecoderClear(&decoder);
goto FAIL;
}
tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
ASSERT(0);
goto FAIL;
}
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
goto FAIL;
}
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
ASSERT(0);
goto FAIL;
}
return 0;
FAIL:
if (pTask) tFreeStreamTask(pTask);
return -1;
}
#endif
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL; void* buf = NULL;
int32_t len; int32_t len;
...@@ -241,14 +204,15 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -241,14 +204,15 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
*pAdded = false; *pAdded = false;
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p == NULL) { if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); taosArrayPush(pMeta->pTaskList, &pTask->id);
if (streamMetaSaveTask(pMeta, pTask) < 0) { if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
...@@ -263,7 +227,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa ...@@ -263,7 +227,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return 0; return 0;
} }
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES);
*pAdded = true; *pAdded = true;
return 0; return 0;
} }
...@@ -274,10 +238,11 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { ...@@ -274,10 +238,11 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
return (int32_t)size; return (int32_t)size;
} }
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); int64_t keys[2] = {streamId, taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask != NULL) { if (ppTask != NULL) {
if (!streamTaskShouldStop(&(*ppTask)->status)) { if (!streamTaskShouldStop(&(*ppTask)->status)) {
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
...@@ -304,22 +269,24 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -304,22 +269,24 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
} }
} }
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) { static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) {
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (*pTaskId == taskId) { if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
taosArrayRemove(pMeta->pTaskList, i); taosArrayRemove(pMeta->pTaskList, i);
break; break;
} }
} }
} }
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
// pre-delete operation // pre-delete operation
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
int64_t keys[2] = {streamId, taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) { if (ppTask) {
pTask = *ppTask; pTask = *ppTask;
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
...@@ -335,7 +302,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -335,7 +302,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
while (1) { while (1) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) { if (ppTask) {
if ((*ppTask)->status.timerActive == 0) { if ((*ppTask)->status.timerActive == 0) {
...@@ -354,15 +321,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -354,15 +321,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
// let's do delete of stream task // let's do delete of stream task
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) { if (ppTask) {
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, keys, sizeof(keys));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
// remove the ref by timer // remove the ref by timer
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
...@@ -473,7 +438,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -473,7 +438,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
} }
// do duplicate task check. // do duplicate task check.
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (p == NULL) { if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
tdbFree(pKey); tdbFree(pKey);
...@@ -484,7 +450,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -484,7 +450,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); taosArrayPush(pMeta->pTaskList, &pTask->id);
} else { } else {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
...@@ -493,7 +459,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -493,7 +459,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
continue; continue;
} }
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
typedef struct SStreamTaskRetryInfo { typedef struct SStreamTaskRetryInfo {
SStreamMeta* pMeta; SStreamMeta* pMeta;
int32_t taskId; int32_t taskId;
int64_t streamId;
} SStreamTaskRetryInfo; } SStreamTaskRetryInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
...@@ -540,7 +541,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ...@@ -540,7 +541,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId); qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); int64_t keys[2] = {pInfo->streamId, pInfo->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) { if (ppTask) {
ASSERT((*ppTask)->status.timerActive == 1); ASSERT((*ppTask)->status.timerActive == 1);
...@@ -556,12 +559,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ...@@ -556,12 +559,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->streamId, pInfo->taskId);
if (pTask != NULL) { if (pTask != NULL) {
ASSERT(pTask->status.timerActive == 1); ASSERT(pTask->status.timerActive == 1);
// abort the timer if intend to stop task // abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qWarn( qWarn(
...@@ -595,14 +598,16 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { ...@@ -595,14 +598,16 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId; int32_t hTaskId = pTask->historyTaskId.taskId;
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
// Set the execute conditions, including the query time window and the version range // Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId)); SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (pHTask == NULL) { if (pHTask == NULL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, hTaskId); pMeta->vgId, hTaskId);
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
pInfo->taskId = pTask->id.taskId; pInfo->taskId = pTask->id.taskId;
pInfo->streamId = pTask->id.streamId;
pInfo->pMeta = pTask->pMeta; pInfo->pMeta = pTask->pMeta;
if (pTask->launchTaskTimer == NULL) { if (pTask->launchTaskTimer == NULL) {
...@@ -797,7 +802,8 @@ void launchFillHistoryTask(SStreamTask* pTask) { ...@@ -797,7 +802,8 @@ void launchFillHistoryTask(SStreamTask* pTask) {
} }
ASSERT(pTask->status.downstreamReady == 1); ASSERT(pTask->status.downstreamReady == 1);
qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId); qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->historyTaskId.streamId, tId);
// launch associated fill history task // launch associated fill history task
streamLaunchFillHistoryTask(pTask); streamLaunchFillHistoryTask(pTask);
......
...@@ -216,7 +216,7 @@ static void freeItem(void* p) { ...@@ -216,7 +216,7 @@ static void freeItem(void* p) {
} }
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask); qDebug("free s-task:0x%x, %p", pTask->id.taskId, pTask);
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) { if (pTask->inputQueue) {
......
...@@ -53,7 +53,7 @@ typedef struct { ...@@ -53,7 +53,7 @@ typedef struct {
int32_t retryMinInterval; // retry init interval int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval
int32_t retryMaxTimouet; int32_t retryMaxTimeout;
int32_t failFastThreshold; int32_t failFastThreshold;
int32_t failFastInterval; int32_t failFastInterval;
......
...@@ -55,7 +55,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -55,7 +55,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval
pRpc->retryStepFactor = pInit->retryStepFactor; pRpc->retryStepFactor = pInit->retryStepFactor;
pRpc->retryMaxInterval = pInit->retryMaxInterval; pRpc->retryMaxInterval = pInit->retryMaxInterval;
pRpc->retryMaxTimouet = pInit->retryMaxTimouet; pRpc->retryMaxTimeout = pInit->retryMaxTimeout;
pRpc->failFastThreshold = pInit->failFastThreshold; pRpc->failFastThreshold = pInit->failFastThreshold;
pRpc->failFastInterval = pInit->failFastInterval; pRpc->failFastInterval = pInit->failFastInterval;
......
...@@ -2287,7 +2287,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -2287,7 +2287,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pCtx->retryMinInterval = pTransInst->retryMinInterval; pCtx->retryMinInterval = pTransInst->retryMinInterval;
pCtx->retryMaxInterval = pTransInst->retryMaxInterval; pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
pCtx->retryStepFactor = pTransInst->retryStepFactor; pCtx->retryStepFactor = pTransInst->retryStepFactor;
pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet; pCtx->retryMaxTimeout = pTransInst->retryMaxTimeout;
pCtx->retryInitTimestamp = taosGetTimestampMs(); pCtx->retryInitTimestamp = taosGetTimestampMs();
pCtx->retryNextInterval = pCtx->retryMinInterval; pCtx->retryNextInterval = pCtx->retryMinInterval;
pCtx->retryStep = 0; pCtx->retryStep = 0;
......
...@@ -726,7 +726,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -726,7 +726,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tError("read error %s", uv_err_name(nread)); tError("read error %s", uv_err_name(nread));
} }
// TODO(log other failure reason) // TODO(log other failure reason)
tWarn("failed to create connect:%p", q); tWarn("failed to create connect:%p, reason: %s", q, uv_err_name(nread));
taosMemoryFree(buf->base); taosMemoryFree(buf->base);
uv_close((uv_handle_t*)q, NULL); uv_close((uv_handle_t*)q, NULL);
return; return;
...@@ -741,10 +741,17 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -741,10 +741,17 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_pipe_t* pipe = (uv_pipe_t*)q; uv_pipe_t* pipe = (uv_pipe_t*)q;
if (!uv_pipe_pending_count(pipe)) { if (!uv_pipe_pending_count(pipe)) {
tError("No pending count"); tError("No pending count");
uv_close((uv_handle_t*)q, NULL);
return;
}
if (pThrd->quit) {
tWarn("thread already received quit msg, ignore incoming conn");
uv_close((uv_handle_t*)q, NULL);
return; return;
} }
uv_handle_type pending = uv_pipe_pending_type(pipe); // uv_handle_type pending = uv_pipe_pending_type(pipe);
SSvrConn* pConn = createConn(pThrd); SSvrConn* pConn = createConn(pThrd);
......
...@@ -452,7 +452,7 @@ ...@@ -452,7 +452,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3 ,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
#,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 ,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
...@@ -794,9 +794,10 @@ ...@@ -794,9 +794,10 @@
,,y,script,./test.sh -f tsim/user/basic.sim ,,y,script,./test.sh -f tsim/user/basic.sim
,,y,script,./test.sh -f tsim/user/password.sim ,,y,script,./test.sh -f tsim/user/password.sim
,,y,script,./test.sh -f tsim/user/privilege_db.sim ,,y,script,./test.sh -f tsim/user/privilege_db.sim
#,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim ,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
,,y,script,./test.sh -f tsim/user/privilege_topic.sim ,,y,script,./test.sh -f tsim/user/privilege_topic.sim
,,y,script,./test.sh -f tsim/user/privilege_table.sim ,,y,script,./test.sh -f tsim/user/privilege_table.sim
,,y,script,./test.sh -f tsim/user/privilege_create_db.sim
,,y,script,./test.sh -f tsim/db/alter_option.sim ,,y,script,./test.sh -f tsim/db/alter_option.sim
,,y,script,./test.sh -f tsim/db/alter_replica_31.sim ,,y,script,./test.sh -f tsim/db/alter_replica_31.sim
,,y,script,./test.sh -f tsim/db/basic1.sim ,,y,script,./test.sh -f tsim/db/basic1.sim
...@@ -969,6 +970,7 @@ ...@@ -969,6 +970,7 @@
,,y,script,./test.sh -f tsim/query/tag_scan.sim ,,y,script,./test.sh -f tsim/query/tag_scan.sim
,,y,script,./test.sh -f tsim/query/nullColSma.sim ,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/query/bug3398.sim ,,y,script,./test.sh -f tsim/query/bug3398.sim
,,y,script,./test.sh -f tsim/query/explain_tsorder.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim
......
...@@ -30,7 +30,15 @@ class TDTestCase: ...@@ -30,7 +30,15 @@ class TDTestCase:
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300;use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
select avg(c1) from deldata.ct1;
delete from deldata.stb1;
flush database deldata;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
delete from deldata.ct1;'''
def checkProcessPid(self,processName): def checkProcessPid(self,processName):
i=0 i=0
while i<60: while i<60:
...@@ -138,6 +146,8 @@ class TDTestCase: ...@@ -138,6 +146,8 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}") tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '")
# os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ") # os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ') # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ') # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
...@@ -151,6 +161,10 @@ class TDTestCase: ...@@ -151,6 +161,10 @@ class TDTestCase:
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
# add deleted data
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "{self.deletedDataSql}" ')
cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}") tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}")
if os.system(cmd) == 0: if os.system(cmd) == 0:
...@@ -185,11 +199,19 @@ class TDTestCase: ...@@ -185,11 +199,19 @@ class TDTestCase:
# tdsql.query("show streams;") # tdsql.query("show streams;")
# tdsql.query(f"select count(*) from {stb}") # tdsql.query(f"select count(*) from {stb}")
# tdsql.checkData(0,0,tableNumbers*recordNumbers2) # tdsql.checkData(0,0,tableNumbers*recordNumbers2)
tdsql.query(f"select count(*) from db4096.stb0")
# checkout db4096
tdsql.query("select count(*) from db4096.stb0")
tdsql.checkData(0,0,50000) tdsql.checkData(0,0,50000)
# checkout deleted data
tdsql.execute("insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );")
tdsql.execute("flush database deldata;")
tdsql.query("select avg(c1) from deldata.ct1;")
tdsql=tdCom.newTdSql() tdsql=tdCom.newTdSql()
tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542") tdLog.printNoPrefix("==========step4:verify backticks in taos Sql-TD18542")
tdsql.execute("drop database if exists db") tdsql.execute("drop database if exists db")
tdsql.execute("create database db") tdsql.execute("create database db")
tdsql.execute("use db") tdsql.execute("use db")
...@@ -203,6 +225,8 @@ class TDTestCase: ...@@ -203,6 +225,8 @@ class TDTestCase:
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);") tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
tdsql.query("select * from db.ct4") tdsql.query("select * from db.ct4")
tdsql.checkData(0,1,14) tdsql.checkData(0,1,14)
#check retentions
tdsql=tdCom.newTdSql() tdsql=tdCom.newTdSql()
tdsql.query("describe information_schema.ins_databases;") tdsql.query("describe information_schema.ins_databases;")
qRows=tdsql.queryRows qRows=tdsql.queryRows
...@@ -222,8 +246,12 @@ class TDTestCase: ...@@ -222,8 +246,12 @@ class TDTestCase:
caller = inspect.getframeinfo(inspect.stack()[0][0]) caller = inspect.getframeinfo(inspect.stack()[0][0])
args = (caller.filename, caller.lineno) args = (caller.filename, caller.lineno)
tdLog.exit("%s(%d) failed" % args) tdLog.exit("%s(%d) failed" % args)
# check stream
tdsql.query("show streams;") tdsql.query("show streams;")
tdsql.checkRows(0) tdsql.checkRows(0)
#check TS-3131
tdsql.query("select *,tbname from d0.almlog where mcid='m0103';") tdsql.query("select *,tbname from d0.almlog where mcid='m0103';")
tdsql.checkRows(6) tdsql.checkRows(6)
expectList = [0,3003,20031,20032,20033,30031] expectList = [0,3003,20031,20032,20033,30031]
...@@ -238,6 +266,8 @@ class TDTestCase: ...@@ -238,6 +266,8 @@ class TDTestCase:
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);") tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);") tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
# check tmq
conn = taos.connect() conn = taos.connect()
consumer = Consumer( consumer = Consumer(
...@@ -265,6 +295,8 @@ class TDTestCase: ...@@ -265,6 +295,8 @@ class TDTestCase:
print(block.fetchall()) print(block.fetchall())
tdsql.query("show topics;") tdsql.query("show topics;")
tdsql.checkRows(1) tdsql.checkRows(1)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
......
drop database if exists deldata;
create database deldata duration 300;
use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
select avg(c1) from deldata.ct1;
delete from deldata.stb1;
flush database deldata;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
delete from deldata.ct1;
...@@ -27,7 +27,7 @@ import threading ...@@ -27,7 +27,7 @@ import threading
import time import time
import json import json
BASEVERSION = "3.0.7.0" BASEVERSION = "3.1.0.0"
class TDTestCase: class TDTestCase:
...@@ -37,6 +37,15 @@ class TDTestCase: ...@@ -37,6 +37,15 @@ class TDTestCase:
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
self.host = socket.gethostname() self.host = socket.gethostname()
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300;use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
select avg(c1) from deldata.ct1;
delete from deldata.stb1;
flush database deldata;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
delete from deldata.ct1;'''
def checkProcessPid(self,processName): def checkProcessPid(self,processName):
i=0 i=0
...@@ -245,6 +254,9 @@ class TDTestCase: ...@@ -245,6 +254,9 @@ class TDTestCase:
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
# self.buildTaosd(bPath) # self.buildTaosd(bPath)
# add deleted data
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "{self.deletedDataSql}" ')
threads=[] threads=[]
threads.append(threading.Thread(target=self.insertAllData, args=(cPath_temp,dbname,tableNumbers1,recordNumbers1))) threads.append(threading.Thread(target=self.insertAllData, args=(cPath_temp,dbname,tableNumbers1,recordNumbers1)))
for tr in threads: for tr in threads:
...@@ -285,6 +297,11 @@ class TDTestCase: ...@@ -285,6 +297,11 @@ class TDTestCase:
tdsql1.query(f"select count(*) from db4096.stb0") tdsql1.query(f"select count(*) from db4096.stb0")
tdsql1.checkData(0,0,50000) tdsql1.checkData(0,0,50000)
# checkout deleted data
tdsql.execute("insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );")
tdsql.query("flush database deldata;select avg(c1) from deldata.ct1;")
# tdsql1.query("show streams;") # tdsql1.query("show streams;")
# tdsql1.checkRows(2) # tdsql1.checkRows(2)
tdsql1.query("select *,tbname from d0.almlog where mcid='m0103';") tdsql1.query("select *,tbname from d0.almlog where mcid='m0103';")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册