diff --git a/docs/en/12-taos-sql/20-keywords.md b/docs/en/12-taos-sql/20-keywords.md index 3c441ed8d40f1028ca2accfa4f4413e259e01152..d563181b876afe0a2884f514de1cc09102f708c7 100644 --- a/docs/en/12-taos-sql/20-keywords.md +++ b/docs/en/12-taos-sql/20-keywords.md @@ -178,6 +178,7 @@ The following list shows all reserved keywords: - MATCH - MAX_DELAY +- MAX_SPEED - MAXROWS - MERGE - META diff --git a/docs/en/12-taos-sql/27-index.md b/docs/en/12-taos-sql/27-indexing.md similarity index 100% rename from docs/en/12-taos-sql/27-index.md rename to docs/en/12-taos-sql/27-indexing.md diff --git a/docs/zh/12-taos-sql/05-insert.md b/docs/zh/12-taos-sql/05-insert.md index b72754b15437a081b89056fca3a76030cac62363..c03ad9bd8f3ef1485df6ee1e536f15825bcb459c 100644 --- a/docs/zh/12-taos-sql/05-insert.md +++ b/docs/zh/12-taos-sql/05-insert.md @@ -82,7 +82,7 @@ INSERT INTO d1001 (ts, current, phase) VALUES ('2021-07-13 14:06:33.196', 10.27, ```sql INSERT INTO d1001 VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33) - d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31); + d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31); ``` ## 插入记录时自动建表 diff --git a/docs/zh/12-taos-sql/20-keywords.md b/docs/zh/12-taos-sql/20-keywords.md index 35dafc52efcce1412d1ddc6871d8e4eafc1f0d4e..f52af2f282c7ecdab89dcd5b1236746a2d111348 100644 --- a/docs/zh/12-taos-sql/20-keywords.md +++ b/docs/zh/12-taos-sql/20-keywords.md @@ -178,6 +178,7 @@ description: TDengine 保留关键字的详细列表 - MATCH - MAX_DELAY +- MAX_SPEED - MAXROWS - MERGE - META diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index 2f5f0fc3e81f57e0f1438dab897428be9c83f17d..519b84ba71be996ff3f5811fad7314a261927b8b 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -95,30 +95,11 @@ taos -C ### maxShellConns | 属性 | 说明 | -| --------| ----------------------- | +| -------- | ----------------------- | | 适用范围 | 仅服务端适用 | -| 含义 | 一个 dnode 容许的连接数 | +| 含义 | 一个 dnode 容许的连接数 | | 取值范围 | 10-50000000 | -| 缺省值 | 5000 | - -### numOfRpcSessions - -| 属性 | 说明 | -| --------| ---------------------- | -| 适用范围 | 客户端和服务端都适用 | -| 含义 | 一个客户端能创建的最大连接数| -| 取值范围 | 100-100000 | -| 缺省值 | 10000 | - -### timeToGetAvailableConn - -| 属性 | 说明 | -| -------- | --------------------| -| 适用范围 | 客户端和服务端都适用 | -| 含义 |获得可用连接的最长等待时间| -| 取值范围 | 10-50000000(单位为毫秒)| -| 缺省值 | 500000 | - +| 缺省值 | 5000 | ### numOfRpcSessions @@ -127,7 +108,7 @@ taos -C | 适用范围 | 客户端和服务端都适用 | | 含义 | 一个客户端能创建的最大连接数 | | 取值范围 | 100-100000 | -| 缺省值 | 10000 | +| 缺省值 | 30000 | ### timeToGetAvailableConn @@ -392,12 +373,12 @@ charset 的有效值是 UTF-8。 ### metaCacheMaxSize -| 属性 | 说明 | -| -------- | ---------------------------------------------- | -| 适用范围 | 仅客户端适用 | -| 含义 | 指定单个客户端元数据缓存大小的最大值 | -| 单位 | MB | -| 缺省值 | -1 (无限制) | +| 属性 | 说明 | +| -------- | ------------------------------------ | +| 适用范围 | 仅客户端适用 | +| 含义 | 指定单个客户端元数据缓存大小的最大值 | +| 单位 | MB | +| 缺省值 | -1 (无限制) | ## 集群相关 @@ -479,13 +460,13 @@ charset 的有效值是 UTF-8。 ### slowLogScope -| 属性 | 说明 | -| -------- | --------------------------------------------------------------| -| 适用范围 | 仅客户端适用 | -| 含义 | 指定启动记录哪些类型的慢查询 | -| 可选值 | ALL, QUERY, INSERT, OTHERS, NONE | -| 缺省值 | ALL | -| 补充说明 | 默认记录所有类型的慢查询,可通过配置只记录某一类型的慢查询 | +| 属性 | 说明 | +| -------- | ---------------------------------------------------------- | +| 适用范围 | 仅客户端适用 | +| 含义 | 指定启动记录哪些类型的慢查询 | +| 可选值 | ALL, QUERY, INSERT, OTHERS, NONE | +| 缺省值 | ALL | +| 补充说明 | 默认记录所有类型的慢查询,可通过配置只记录某一类型的慢查询 | ### debugFlag @@ -685,16 +666,16 @@ charset 的有效值是 UTF-8。 | 适用范围 | 仅客户端适用 | | 含义 | schemaless 列数据是否顺序一致,从3.0.3.0开始,该配置废弃 | | 值域 | 0:不一致;1: 一致 | -| 缺省值 | 0 +| 缺省值 | 0 | ### smlTsDefaultName -| 属性 | 说明 | -| -------- | -------------------------------------------------------- | -| 适用范围 | 仅客户端适用 | +| 属性 | 说明 | +| -------- | -------------------------------------------- | +| 适用范围 | 仅客户端适用 | | 含义 | schemaless自动建表的时间列名字通过该配置设置 | | 类型 | 字符串 | -| 缺省值 | _ts | +| 缺省值 | _ts | ## 其他 @@ -728,31 +709,31 @@ charset 的有效值是 UTF-8。 ### ttlChangeOnWrite -| 属性 | 说明 | -| -------- | ------------------ | -| 适用范围 | 仅服务端适用 | -| 含义 | ttl 到期时间是否伴随表的修改操作改变 | -| 取值范围 | 0: 不改变;1:改变 | -| 缺省值 | 0 | +| 属性 | 说明 | +| -------- | ------------------------------------ | +| 适用范围 | 仅服务端适用 | +| 含义 | ttl 到期时间是否伴随表的修改操作改变 | +| 取值范围 | 0: 不改变;1:改变 | +| 缺省值 | 0 | ### keepTimeOffset -| 属性 | 说明 | -| -------- | ------------------ | -| 适用范围 | 仅服务端适用 | -| 含义 | 迁移操作的延时 | -| 单位 | 小时 | -| 取值范围 | 0-23 | -| 缺省值 | 0 | +| 属性 | 说明 | +| -------- | -------------- | +| 适用范围 | 仅服务端适用 | +| 含义 | 迁移操作的延时 | +| 单位 | 小时 | +| 取值范围 | 0-23 | +| 缺省值 | 0 | ### tmqMaxTopicNum -| 属性 | 说明 | -| -------- | ------------------ | -| 适用范围 | 仅服务端适用 | -| 含义 | 订阅最多可建立的 topic 数量 | -| 取值范围 | 1-10000| -| 缺省值 | 20 | +| 属性 | 说明 | +| -------- | --------------------------- | +| 适用范围 | 仅服务端适用 | +| 含义 | 订阅最多可建立的 topic 数量 | +| 取值范围 | 1-10000 | +| 缺省值 | 20 | ## 压缩参数 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e772a47e3dc79aec6f474f1f9ad93b4398c67f6e..01923d2b30af0b4f4de5eddac607503261f32fc1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2767,6 +2767,7 @@ typedef struct { typedef struct { SMsgHead head; int64_t leftForVer; + int64_t streamId; int32_t taskId; } SVDropStreamTaskReq; @@ -2958,6 +2959,7 @@ int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset); typedef struct { SMsgHead head; + int64_t streamId; int32_t taskId; } SVPauseStreamTaskReq; @@ -2976,6 +2978,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq typedef struct { SMsgHead head; int32_t taskId; + int64_t streamId; int8_t igUntreated; } SVResumeStreamTaskReq; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index f90c38f341edccf801d7f7d470228c524a8f794d..634d708260790fb26ec5c6fb28987fedbecbc75f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -74,7 +74,7 @@ typedef enum { * @param vgId * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId); +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId); /** * Create the exec task for queue mode @@ -95,8 +95,6 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList */ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); -//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); - int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); // todo refactor diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b241ae9b4125d4fd0fbaf40ef7477d819229572f..b9b24917f3dcf3a2a31780e1f9980b93da74efd4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -644,9 +644,9 @@ void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); -int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it -SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); +SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaBegin(SStreamMeta* pMeta); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 93e4d72ad741e1a0d4cf9d9d4b0cf2ac0b2ef194..e5955aad5449b955d3d3b3a7470692f3ad5b7feb 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -89,7 +89,7 @@ typedef struct SRpcInit { int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor int32_t retryMaxInterval; // retry max interval - int64_t retryMaxTimouet; + int64_t retryMaxTimeout; int32_t failFastThreshold; int32_t failFastInterval; diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index be2c26c3093fcbd5dedaab780e461400f4c8460d..eca0c5e973ed0d8e69843e8a7c6603bb133b95ad 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -123,8 +123,8 @@ function clean_bin() { ${csudo}rm -f ${bin_link_dir}/set_core || : ${csudo}rm -f ${bin_link_dir}/TDinsight.sh || : ${csudo}rm -f ${bin_link_dir}/${keeperName2} || : - ${csudo}rm -f ${bin_link_dir}/${xName2} || : - ${csudo}rm -f ${bin_link_dir}/${explorerName2} || : + # ${csudo}rm -f ${bin_link_dir}/${xName2} || : + # ${csudo}rm -f ${bin_link_dir}/${explorerName2} || : if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then ${csudo}rm -f ${bin_link_dir}/${clientName2} || : @@ -194,26 +194,26 @@ function clean_service_on_systemd() { fi ${csudo}systemctl disable ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null - x_service_config="${service_config_dir}/${xName2}.service" - if [ -e "$x_service_config" ]; then - if systemctl is-active --quiet ${xName2}; then - echo "${productName2} ${xName2} is running, stopping it..." - ${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${x_service_config} - fi - - explorer_service_config="${service_config_dir}/${explorerName2}.service" - if [ -e "$explorer_service_config" ]; then - if systemctl is-active --quiet ${explorerName2}; then - echo "${productName2} ${explorerName2} is running, stopping it..." - ${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${explorer_service_config} - ${csudo}rm -f /etc/${clientName2}/explorer.toml - fi + # x_service_config="${service_config_dir}/${xName2}.service" + # if [ -e "$x_service_config" ]; then + # if systemctl is-active --quiet ${xName2}; then + # echo "${productName2} ${xName2} is running, stopping it..." + # ${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null + # fi + # ${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null + # ${csudo}rm -f ${x_service_config} + # fi + + # explorer_service_config="${service_config_dir}/${explorerName2}.service" + # if [ -e "$explorer_service_config" ]; then + # if systemctl is-active --quiet ${explorerName2}; then + # echo "${productName2} ${explorerName2} is running, stopping it..." + # ${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null + # fi + # ${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null + # ${csudo}rm -f ${explorer_service_config} + # ${csudo}rm -f /etc/${clientName2}/explorer.toml + # fi } function clean_service_on_sysvinit() { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 238b3613f572339edd40c073e6a0f7039c2e5f98..40c27bf164b448a43ce624d3b5a4f21033ad2259 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -169,7 +169,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryMaxInterval = tsRedirectMaxPeriod; - rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + rpcInit.retryMaxTimeout = tsMaxRetryWaitTime; int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 10); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d35f9a24a8ac06b7a87ef3870d71731a2ba38be4..a772efc33cf8ab1cbae6236921df6e3e624683bf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -47,7 +47,7 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; -int32_t tsNumOfRpcSessions = 10000; +int32_t tsNumOfRpcSessions = 30000; int32_t tsTimeToGetAvailableConn = 500000; int32_t tsKeepAliveIdle = 60; @@ -1281,9 +1281,9 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { // tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; // } else if (strcasecmp("smlBatchSize", name) == 0) { // tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; - } else if(strcasecmp("smlTsDefaultName", name) == 0) { + } else if (strcasecmp("smlTsDefaultName", name) == 0) { tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN); - } else if(strcasecmp("smlDot2Underline", name) == 0) { + } else if (strcasecmp("smlDot2Underline", name) == 0) { tsSmlDot2Underline = cfgGetItem(pCfg, "smlDot2Underline")->bval; } else if (strcasecmp("shellActivityTimer", name) == 0) { tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index df54f8abbad86a9c23c96c0b533bbd8f432ae7a3..e0f7da3ac49fb74838ddc75c2e5403f807bf8b63 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -299,7 +299,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryMaxInterval = tsRedirectMaxPeriod; - rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + rpcInit.retryMaxTimeout = tsMaxRetryWaitTime; rpcInit.failFastInterval = 5000; // interval threshold(ms) rpcInit.failFastThreshold = 3; // failed threshold diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 2aac05b22d2c126780b479a3cbc2fa255d2f3e5f..36771147a9b5c312cb225ccae27bb426db39b3f5 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -232,7 +232,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory) { - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); + int64_t uid = (fillHistory == 0)? pStream->uid:pStream->hTaskUid; + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -335,8 +336,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; - mDebug("s-task:0x%x related history task:0x%x, level:%d", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId, - (*pHTask)->info.taskLevel); + mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId, + (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel); } } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4001202254d7717752712a97e2c38471be773182..a0d53ec780e26b969d1d543761582261b6251474 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -649,6 +649,8 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + STransAction action = {0}; memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; @@ -1361,6 +1363,8 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { } pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + STransAction action = {0}; memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; @@ -1501,7 +1505,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig } pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; pReq->igUntreated = igUntreated; + STransAction action = {0}; memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 7235a56691ce580d8cb67d923245e26fdb53c82f..4000e728359b7f88f339519474b2033b14502bf6 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -35,9 +35,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); - int32_t taskId = req.taskId; - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, @@ -88,7 +86,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; initStreamStateAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId); ASSERT(pTask->exec.pExecutor); taosThreadMutexInit(&pTask->lock, NULL); @@ -181,21 +179,21 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); - SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); return 0; } - streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId); + streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTaskRunReq *pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask) { streamProcessRunReq(pTask); streamMetaReleaseTask(pSnode->pMeta, pTask); @@ -213,9 +211,8 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - int32_t taskId = req.taskId; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; streamProcessDispatchMsg(pTask, &req, &rsp, exec); @@ -235,8 +232,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); - int32_t taskId = req.dstTaskId; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.dstTaskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0}; @@ -251,8 +247,11 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = ntohl(pRsp->upstreamTaskId); - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + + int32_t taskId = htonl(pRsp->upstreamTaskId); + int64_t streamId = htobe64(pRsp->streamId); + + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, streamId, taskId); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pSnode->pMeta, pTask); @@ -260,7 +259,6 @@ int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { } else { return -1; } - return 0; } int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { @@ -297,7 +295,7 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) tDecoderClear(&decoder); // find task - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { return -1; } @@ -340,7 +338,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { .upstreamTaskId = req.upstreamTaskId, }; - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId); if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); @@ -400,7 +398,7 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, pSnode->pMeta->vgId); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 9fd493844877d343e07d9f251b6b0882f25f89e0..1e7de3c5263049a02984b47d171104bf43ba496c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -267,7 +267,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState}; initStorageAPI(&handle.api); - pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); + pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 41e9268452ebb8f24be8a0376b5b40103ca219d1..ad1af080fdef9b6f8cd844dfb1061dd69efb2d2e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -956,7 +956,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .winRange = pTask->dataRange.window}; initStorageAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); if (pTask->exec.pExecutor == NULL) { return -1; } @@ -983,7 +983,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .winRange = pTask->dataRange.window}; initStorageAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); if (pTask->exec.pExecutor == NULL) { return -1; } @@ -1062,7 +1062,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { .upstreamTaskId = req.upstreamTaskId, }; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1072,8 +1072,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; - tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", - taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp status %d", + req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); @@ -1099,7 +1100,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, pTq->pStreamMeta->vgId); @@ -1149,32 +1150,27 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWLockLatch(&pStreamMeta->lock); code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); + taosWUnLockLatch(&pStreamMeta->lock); if (code < 0) { tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks); tFreeStreamTask(pTask); - taosWUnLockLatch(&pStreamMeta->lock); return -1; } // not added into meta store - if (!added) { + if (added) { + tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); + SStreamTask* p = streamMetaAcquireTask(pStreamMeta, pTask->id.streamId, taskId); + if (p != NULL) { // reset the downstreamReady flag. + streamTaskCheckDownstreamTasks(p); + } + streamMetaReleaseTask(pStreamMeta, p); + } else { tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId); tFreeStreamTask(pTask); - pTask = NULL; - } - - taosWUnLockLatch(&pStreamMeta->lock); - - tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); - - // 3. It's an fill history task, do nothing. wait for the main task to start it - SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); - if (p != NULL) { // reset the downstreamReady flag. - streamTaskCheckDownstreamTasks(p); } - streamMetaReleaseTask(pStreamMeta, p); return 0; } @@ -1183,7 +1179,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t code = TSDB_CODE_SUCCESS; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", pMeta->vgId, pReq->taskId); @@ -1239,7 +1235,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { bool done = false; // 1. get the related stream task - pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); + pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", @@ -1247,7 +1243,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaUnregisterTask(pMeta, pTask->id.taskId); + streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return -1; } @@ -1355,7 +1351,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); return -1; @@ -1391,7 +1387,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pTq->pStreamMeta->vgId, req.downstreamTaskId); @@ -1417,7 +1413,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { tDecodeCompleteHistoryDataMsg(&decoder, &req); tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pTq->pStreamMeta->vgId, req.upstreamTaskId); @@ -1508,7 +1504,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t st = pTask->status.taskStatus; @@ -1543,7 +1539,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp, exec); @@ -1557,10 +1553,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = ntohl(pRsp->upstreamTaskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - int32_t vgId = pTq->pStreamMeta->vgId; + int32_t vgId = pTq->pStreamMeta->vgId; + int32_t taskId = htonl(pRsp->upstreamTaskId); + int64_t streamId = htobe64(pRsp->streamId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, streamId, taskId); + if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1574,13 +1572,13 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId); return 0; } - streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId); + streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -1589,7 +1587,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, pReq->taskId); @@ -1602,7 +1600,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { - pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId, pTask->historyTaskId.taskId); @@ -1661,13 +1659,13 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); if (code != 0) { return code; } - SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId); + SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask) { code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); } @@ -1686,8 +1684,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); - int32_t taskId = req.dstTaskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; @@ -1725,7 +1722,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { tDecoderClear(&decoder); int32_t taskId = req.taskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp, false); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c3e7d03e4397f5d21c1866e3d2117646cfe5cf2e..3d9a91899cb2f91e811fb66c15610a9d0717ddca 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -72,8 +72,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { taosWUnLockLatch(&pMeta->lock); for (int32_t i = 0; i < numOfTasks; ++i) { - int32_t* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId); + SStreamId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; } @@ -242,8 +242,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - int32_t* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); + SStreamId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b68f82d8476c705d0ae574dcb9edf9ad5ccf48fc..57a649d682827c3e42c208922e097a7d4a23634e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -439,7 +439,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void return code; _end: - tsdbReaderClose(pReader); + tsdbReaderClose2(pReader); *ppReader = NULL; return code; } @@ -1731,41 +1731,45 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader // row in last file block TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); - + int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist + if (key < tsLast) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key == ts) { - SRow* pTSRow = NULL; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + } else if (key > tsLast) { + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + } + } else { + if (key > tsLast) { + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } else if (key < tsLast) { + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + } + } + // the following for key == tsLast + SRow* pTSRow = NULL; + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); + TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); - code = tsdbRowMergerGetRow(pMerger, &pTSRow); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = tsdbRowMergerGetRow(pMerger, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + + taosMemoryFree(pTSRow); + tsdbRowMergerClear(pMerger); + return code; - taosMemoryFree(pTSRow); - tsdbRowMergerClear(pMerger); - return code; - } else { // key > ts - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); - } - } else { // desc order - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true); - } } else { // only last block exists return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); } @@ -2192,7 +2196,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBROW *pRow = NULL, *piRow = NULL; - int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; + int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : + (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN); if (pBlockScanInfo->iter.hasVal) { pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); } @@ -2566,9 +2571,18 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // load the last data block of current table STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; + if (pScanInfo == NULL) { + tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr); + bool hasNexTable = moveToNextTable(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + + continue; + } + if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { // reset the index in last block when handing a new file - // doCleanupTableScanInfo(pScanInfo); bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -2577,9 +2591,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { continue; } - // reset the index in last block when handing a new file - // doCleanupTableScanInfo(pScanInfo); - bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasDataInLastFile) { bool hasNexTable = moveToNextTable(pUidList, pStatus); @@ -2669,16 +2680,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { - // only return the rows in last block - int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); - ASSERT(tsLast >= pBlockInfo->record.lastKey); + bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); + int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN; + if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) || + (!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) { + // whole block is required, return it directly + SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; + pInfo->rows = pBlockInfo->record.numRow; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 0; + pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; + setComposedBlockFlag(pReader, false); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); + // update the last key for the corresponding table + pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; + tsdbDebug("%p uid:%" PRIu64 + " clean file block retrieved from file, global index:%d, " + "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", + pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, + pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); + } else { SBlockData* pBData = &pReader->status.fileBlockData; tBlockDataReset(pBData); SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; - tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr); + tsdbDebug("load data in last block firstly %s", pReader->idStr); int64_t st = taosGetTimestampUs(); @@ -2709,23 +2736,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } - } else { // whole block is required, return it directly - SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; - pInfo->rows = pBlockInfo->record.numRow; - pInfo->id.uid = pScanInfo->uid; - pInfo->dataLoad = 0; - pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; - setComposedBlockFlag(pReader, false); - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); - - // update the last key for the corresponding table - pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 - " clean file block retrieved from file, global index:%d, " - "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", - pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, - pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); } + } return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; @@ -4098,12 +4110,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { } tsdbDataFileReaderClose(&pReader->pFileReader); - int64_t loadBlocks = 0; double elapse = 0; pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); - // resetDataBlockScanInfo excluding lastKey STableBlockScanInfo** p = NULL; int32_t iter = 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7832834cee13fe64364da4c0eefd3564c65bf5e9..a6059c7c4200551c654847cb00c3f738be9e36fc 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -304,7 +304,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return pTaskInfo; } -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) { +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) { if (msg == NULL) { return NULL; } @@ -317,7 +317,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); + code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1fd2f7edf47c63b14360aa023b1c128c27a170c5..c7da80fdaf216896a6838d372cda4650990b7e90 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -290,7 +290,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; - SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo: destroy the fill-history task here qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, @@ -350,10 +350,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); - int32_t taskId = pTask->id.taskId; // 5. free it and remove fill-history task from disk meta-store - streamMetaUnregisterTask(pMeta, taskId); + streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); // 6. save to disk taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 80b690e20d0504c0b36b0754a596b39306129001..fe455c0190f3abe013fdfd1609b7cb473afda85b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -66,14 +66,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); + _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); if (pMeta->pTasks == NULL) { goto _err; } // task list - pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t)); + pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamId)); if (pMeta->pTaskList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -161,43 +161,6 @@ void streamMetaClose(SStreamMeta* pMeta) { taosMemoryFree(pMeta); } -#if 0 -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) { - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeStreamTask(&decoder, pTask) < 0) { - tDecoderClear(&decoder); - goto FAIL; - } - tDecoderClear(&decoder); - - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - ASSERT(0); - goto FAIL; - } - - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { - goto FAIL; - } - - if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) { - taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t)); - ASSERT(0); - goto FAIL; - } - - return 0; - -FAIL: - if (pTask) tFreeStreamTask(pTask); - return -1; -} -#endif - int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; @@ -241,14 +204,15 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { *pAdded = false; - void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); return -1; } - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + taosArrayPush(pMeta->pTaskList, &pTask->id); if (streamMetaSaveTask(pMeta, pTask) < 0) { tFreeStreamTask(pTask); @@ -263,7 +227,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); + taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES); *pAdded = true; return 0; } @@ -274,10 +238,11 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { return (int32_t)size; } -SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { +SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + int64_t keys[2] = {streamId, taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); @@ -304,22 +269,24 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { } } -static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) { +static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) { for (int32_t i = 0; i < num; ++i) { - int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); - if (*pTaskId == taskId) { + SStreamId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { taosArrayRemove(pMeta->pTaskList, i); break; } } } -int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; // pre-delete operation taosWLockLatch(&pMeta->lock); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + + int64_t keys[2] = {streamId, taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { pTask = *ppTask; atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); @@ -335,7 +302,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { while (1) { taosRLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { @@ -354,15 +321,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { // let's do delete of stream task taosWLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { - taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); + taosHashRemove(pMeta->pTasks, keys, sizeof(keys)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); ASSERT(pTask->status.timerActive == 0); - - int32_t num = taosArrayGetSize(pMeta->pTaskList); - doRemoveIdFromList(pMeta, num, pTask->id.taskId); + doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); // remove the ref by timer if (pTask->triggerParam != 0) { @@ -473,7 +438,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } // do duplicate task check. - void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { tdbFree(pKey); @@ -484,7 +450,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + taosArrayPush(pMeta->pTaskList, &pTask->id); } else { tdbFree(pKey); tdbFree(pVal); @@ -493,7 +459,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { continue; } - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ad486c3f201d3dab112b417d323c77ab7ccb5c01..e59b3f682d35ea498e6ae6fe9ecf8d0df367cbcc 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -19,7 +19,8 @@ typedef struct SStreamTaskRetryInfo { SStreamMeta* pMeta; - int32_t taskId; + int32_t taskId; + int64_t streamId; } SStreamTaskRetryInfo; static int32_t streamSetParamForScanHistory(SStreamTask* pTask); @@ -540,7 +541,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId); taosWLockLatch(&pMeta->lock); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); + int64_t keys[2] = {pInfo->streamId, pInfo->taskId}; + + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { ASSERT((*ppTask)->status.timerActive == 1); @@ -556,12 +559,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } taosWUnLockLatch(&pMeta->lock); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->streamId, pInfo->taskId); if (pTask != NULL) { ASSERT(pTask->status.timerActive == 1); // abort the timer if intend to stop task - SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); qWarn( @@ -595,14 +598,16 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int32_t hTaskId = pTask->historyTaskId.taskId; + int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId}; // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (pHTask == NULL) { qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, pMeta->vgId, hTaskId); SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); pInfo->taskId = pTask->id.taskId; + pInfo->streamId = pTask->id.streamId; pInfo->pMeta = pTask->pMeta; if (pTask->launchTaskTimer == NULL) { @@ -797,7 +802,8 @@ void launchFillHistoryTask(SStreamTask* pTask) { } ASSERT(pTask->status.downstreamReady == 1); - qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId); + qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, + pTask->historyTaskId.streamId, tId); // launch associated fill history task streamLaunchFillHistoryTask(pTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dc4e5ff4a65a3326c3079e45ef9cd09b99343375..9056fa8d93397bea55b216c202d5090259ab5419 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -216,7 +216,7 @@ static void freeItem(void* p) { } void tFreeStreamTask(SStreamTask* pTask) { - qDebug("free s-task:%s, %p", pTask->id.idStr, pTask); + qDebug("free s-task:0x%x, %p", pTask->id.taskId, pTask); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index ca48da690b6d359d80ff1ce21b14f28379621705..cc2c0d4e845fd0f970f15ae7ce049024579d07c7 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -46,14 +46,14 @@ typedef struct { int8_t connType; char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID - int32_t compatibilityVer; + int32_t compatibilityVer; int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not - + int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor int32_t retryMaxInterval; // retry max interval - int32_t retryMaxTimouet; + int32_t retryMaxTimeout; int32_t failFastThreshold; int32_t failFastInterval; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 08b04519827defca584e768b14322a03b72e8f63..ed94521df09b52df65985ed11b1e21760512f4e0 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -55,7 +55,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryStepFactor = pInit->retryStepFactor; pRpc->retryMaxInterval = pInit->retryMaxInterval; - pRpc->retryMaxTimouet = pInit->retryMaxTimouet; + pRpc->retryMaxTimeout = pInit->retryMaxTimeout; pRpc->failFastThreshold = pInit->failFastThreshold; pRpc->failFastInterval = pInit->failFastInterval; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 71379daa50c012cbba39467a24359115ff8feeb8..cfdc5b5e8bd8eff783af0001e38ca093ea2fe582 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2287,7 +2287,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pCtx->retryMinInterval = pTransInst->retryMinInterval; pCtx->retryMaxInterval = pTransInst->retryMaxInterval; pCtx->retryStepFactor = pTransInst->retryStepFactor; - pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet; + pCtx->retryMaxTimeout = pTransInst->retryMaxTimeout; pCtx->retryInitTimestamp = taosGetTimestampMs(); pCtx->retryNextInterval = pCtx->retryMinInterval; pCtx->retryStep = 0; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a546ee815927eb518979b4343c754d4a729ed846..c6c412022a001487b9f50939c3c6a0c6b2de6844 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -726,7 +726,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tError("read error %s", uv_err_name(nread)); } // TODO(log other failure reason) - tWarn("failed to create connect:%p", q); + tWarn("failed to create connect:%p, reason: %s", q, uv_err_name(nread)); taosMemoryFree(buf->base); uv_close((uv_handle_t*)q, NULL); return; @@ -741,10 +741,17 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_pipe_t* pipe = (uv_pipe_t*)q; if (!uv_pipe_pending_count(pipe)) { tError("No pending count"); + uv_close((uv_handle_t*)q, NULL); + return; + } + if (pThrd->quit) { + tWarn("thread already received quit msg, ignore incoming conn"); + + uv_close((uv_handle_t*)q, NULL); return; } - uv_handle_type pending = uv_pipe_pending_type(pipe); + // uv_handle_type pending = uv_pipe_pending_type(pipe); SSvrConn* pConn = createConn(pThrd); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index fb67ee51cdca57d889c74f2fc85f4685cc689887..612ed86e414d0b5d9bbdb1c866ff9a8d8bf84950 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -452,7 +452,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 ,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3 -#,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 +,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 @@ -794,9 +794,10 @@ ,,y,script,./test.sh -f tsim/user/basic.sim ,,y,script,./test.sh -f tsim/user/password.sim ,,y,script,./test.sh -f tsim/user/privilege_db.sim -#,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim +,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim ,,y,script,./test.sh -f tsim/user/privilege_topic.sim ,,y,script,./test.sh -f tsim/user/privilege_table.sim +,,y,script,./test.sh -f tsim/user/privilege_create_db.sim ,,y,script,./test.sh -f tsim/db/alter_option.sim ,,y,script,./test.sh -f tsim/db/alter_replica_31.sim ,,y,script,./test.sh -f tsim/db/basic1.sim @@ -969,6 +970,7 @@ ,,y,script,./test.sh -f tsim/query/tag_scan.sim ,,y,script,./test.sh -f tsim/query/nullColSma.sim ,,y,script,./test.sh -f tsim/query/bug3398.sim +,,y,script,./test.sh -f tsim/query/explain_tsorder.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 98a0fbe18d2ebadb253cd003f563811476141a7d..cb804aad0cb17f2fd726f5cb93f754c3b1791da6 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -30,7 +30,15 @@ class TDTestCase: self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) - + self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300;use deldata; + create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int); + create table deldata.ct1 using deldata.stb1 tags ( 1 ); + insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a ); + select avg(c1) from deldata.ct1; + delete from deldata.stb1; + flush database deldata; + insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a ); + delete from deldata.ct1;''' def checkProcessPid(self,processName): i=0 while i<60: @@ -138,6 +146,8 @@ class TDTestCase: tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}") tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") + os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '") + # os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ") # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ') # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ') @@ -151,6 +161,10 @@ class TDTestCase: os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") + # add deleted data + os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "{self.deletedDataSql}" ') + + cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}") if os.system(cmd) == 0: @@ -185,11 +199,19 @@ class TDTestCase: # tdsql.query("show streams;") # tdsql.query(f"select count(*) from {stb}") # tdsql.checkData(0,0,tableNumbers*recordNumbers2) - tdsql.query(f"select count(*) from db4096.stb0") + + # checkout db4096 + tdsql.query("select count(*) from db4096.stb0") tdsql.checkData(0,0,50000) + + # checkout deleted data + tdsql.execute("insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );") + tdsql.execute("flush database deldata;") + tdsql.query("select avg(c1) from deldata.ct1;") + tdsql=tdCom.newTdSql() - tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542") + tdLog.printNoPrefix("==========step4:verify backticks in taos Sql-TD18542") tdsql.execute("drop database if exists db") tdsql.execute("create database db") tdsql.execute("use db") @@ -203,6 +225,8 @@ class TDTestCase: tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);") tdsql.query("select * from db.ct4") tdsql.checkData(0,1,14) + + #check retentions tdsql=tdCom.newTdSql() tdsql.query("describe information_schema.ins_databases;") qRows=tdsql.queryRows @@ -222,8 +246,12 @@ class TDTestCase: caller = inspect.getframeinfo(inspect.stack()[0][0]) args = (caller.filename, caller.lineno) tdLog.exit("%s(%d) failed" % args) + + # check stream tdsql.query("show streams;") tdsql.checkRows(0) + + #check TS-3131 tdsql.query("select *,tbname from d0.almlog where mcid='m0103';") tdsql.checkRows(6) expectList = [0,3003,20031,20032,20033,30031] @@ -238,6 +266,8 @@ class TDTestCase: tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);") tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);") + + # check tmq conn = taos.connect() consumer = Consumer( @@ -265,6 +295,8 @@ class TDTestCase: print(block.fetchall()) tdsql.query("show topics;") tdsql.checkRows(1) + + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") diff --git a/tests/system-test/0-others/deletedData.sql b/tests/system-test/0-others/deletedData.sql new file mode 100644 index 0000000000000000000000000000000000000000..781b9562cfebf498374092ed93cbde402b77ba9c --- /dev/null +++ b/tests/system-test/0-others/deletedData.sql @@ -0,0 +1,11 @@ +drop database if exists deldata; +create database deldata duration 300; +use deldata; +create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int); +create table deldata.ct1 using deldata.stb1 tags ( 1 ); +insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a ); +select avg(c1) from deldata.ct1; +delete from deldata.stb1; +flush database deldata; +insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a ); +delete from deldata.ct1; diff --git a/tests/system-test/6-cluster/5dnode3mnodeRoll.py b/tests/system-test/6-cluster/5dnode3mnodeRoll.py index 8d7d4fb3e52c5ab733a385601dc10f4aacba6701..38ac47f777171c7ae32f600bc40c0cfe2e756ea6 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeRoll.py +++ b/tests/system-test/6-cluster/5dnode3mnodeRoll.py @@ -27,7 +27,7 @@ import threading import time import json -BASEVERSION = "3.0.7.0" +BASEVERSION = "3.1.0.0" class TDTestCase: @@ -37,6 +37,15 @@ class TDTestCase: tdSql.init(conn.cursor()) self.host = socket.gethostname() self.replicaVar = int(replicaVar) + self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300;use deldata; + create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int); + create table deldata.ct1 using deldata.stb1 tags ( 1 ); + insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a ); + select avg(c1) from deldata.ct1; + delete from deldata.stb1; + flush database deldata; + insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a ); + delete from deldata.ct1;''' def checkProcessPid(self,processName): i=0 @@ -245,6 +254,9 @@ class TDTestCase: os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") # self.buildTaosd(bPath) + # add deleted data + os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "{self.deletedDataSql}" ') + threads=[] threads.append(threading.Thread(target=self.insertAllData, args=(cPath_temp,dbname,tableNumbers1,recordNumbers1))) for tr in threads: @@ -285,6 +297,11 @@ class TDTestCase: tdsql1.query(f"select count(*) from db4096.stb0") tdsql1.checkData(0,0,50000) + # checkout deleted data + tdsql.execute("insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );") + tdsql.query("flush database deldata;select avg(c1) from deldata.ct1;") + + # tdsql1.query("show streams;") # tdsql1.checkRows(2) tdsql1.query("select *,tbname from d0.almlog where mcid='m0103';")