diff --git a/docs/en/07-develop/02-model/index.mdx b/docs/en/07-develop/02-model/index.mdx index e0378cc77ca28a1a82ef6a52fa1f74d6cd580a01..b647c845d070c26398956f8a9de81864b73120e1 100644 --- a/docs/en/07-develop/02-model/index.mdx +++ b/docs/en/07-develop/02-model/index.mdx @@ -9,15 +9,15 @@ The data model employed by TDengine is similar to that of a relational database. The [characteristics of time-series data](https://www.taosdata.com/blog/2019/07/09/86.html) from different data collection points may be different. Characteristics include collection frequency, retention policy and others which determine how you create and configure the database. For e.g. days to keep, number of replicas, data block size, whether data updates are allowed and other configurable parameters would be determined by the characteristics of your data and your business requirements. For TDengine to operate with the best performance, we strongly recommend that you create and configure different databases for data with different characteristics. This allows you, for example, to set up different storage and retention policies. When creating a database, there are a lot of parameters that can be configured such as, the days to keep data, the number of replicas, the number of memory blocks, time precision, the minimum and maximum number of rows in each data block, whether compression is enabled, the time range of the data in single data file and so on. Below is an example of the SQL statement to create a database. ```sql -CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 6 UPDATE 1; +CREATE DATABASE power KEEP 365 DURATION 10 BUFFER 16 VGROUPS 100 WAL 1; ``` In the above SQL statement: - a database named "power" will be created - the data in it will be kept for 365 days, which means that data older than 365 days will be deleted automatically - a new data file will be created every 10 days -- the number of memory blocks is 6 -- data is allowed to be updated +- the size of memory cache for writing is 16 MB +- data will be firstly written to WAL without FSYNC For more details please refer to [Database](/taos-sql/database). @@ -30,7 +30,6 @@ USE power; :::note - Any table or STable must belong to a database. To create a table or STable, the database it belongs to must be ready. -- JOIN operations can't be performed on tables from two different databases. - Timestamp needs to be specified when inserting rows or querying historical rows. ::: @@ -52,7 +51,7 @@ Similar to creating a regular table, when creating a STable, the name and schema For each kind of data collection point, a corresponding STable must be created. There may be many STables in an application. For electrical power system, we need to create a STable respectively for meters, transformers, busbars, switches. There may be multiple kinds of data collection points on a single device, for example there may be one data collection point for electrical data like current and voltage and another data collection point for environmental data like temperature, humidity and wind direction. Multiple STables are required for these kinds of devices. -At most 4096 (or 1024 prior to version 2.1.7.0) columns are allowed in a STable. If there are more than 4096 of metrics to be collected for a data collection point, multiple STables are required. There can be multiple databases in a system, while one or more STables can exist in a database. +At most 4096 columns are allowed in a STable. If there are more than 4096 of metrics to be collected for a data collection point, multiple STables are required. There can be multiple databases in a system, while one or more STables can exist in a database. ## Create Table @@ -66,12 +65,11 @@ In the above SQL statement, "d1001" is the table name, "meters" is the STable na In the TDengine system, it's recommended to create a table for a data collection point via STable. A table created via STable is called subtable in some parts of the TDengine documentation. All SQL commands applied on regular tables can be applied on subtables. -:::warning -It's not recommended to create a table in a database while using a STable from another database as template. - :::tip It's suggested to use the globally unique ID of a data collection point as the table name. For example the device serial number could be used as a unique ID. If a unique ID doesn't exist, multiple IDs that are not globally unique can be combined to form a globally unique ID. It's not recommended to use a globally unique ID as tag value. +::: + ## Create Table Automatically In some circumstances, it's unknown whether the table already exists when inserting rows. The table can be created automatically using the SQL statement below, and nothing will happen if the table already exists. diff --git a/docs/zh/07-develop/02-model/index.mdx b/docs/zh/07-develop/02-model/index.mdx index 7e2762b6e78393493c2c5b61959e9a6ff57a7b13..be545e8813b26b3abbb22d8231590a909e935a83 100644 --- a/docs/zh/07-develop/02-model/index.mdx +++ b/docs/zh/07-develop/02-model/index.mdx @@ -8,13 +8,13 @@ TDengine 采用类关系型数据模型,需要建库、建表。因此对于 ## 创建库 -不同类型的数据采集点往往具有不同的数据特征,包括数据采集频率的高低,数据保留时间的长短,副本的数目,数据块的大小,是否允许更新数据等等。为了在各种场景下 TDengine 都能最大效率的工作,TDengine 建议将不同数据特征的表创建在不同的库里,因为每个库可以配置不同的存储策略。创建一个库时,除 SQL 标准的选项外,还可以指定保留时长、副本数、内存块个数、时间精度、文件块里最大最小记录条数、是否压缩、一个数据文件覆盖的天数等多种参数。比如: +不同类型的数据采集点往往具有不同的数据特征,包括数据采集频率的高低,数据保留时间的长短,副本的数目,数据块的大小,是否允许更新数据等等。为了在各种场景下 TDengine 都能最大效率的工作,TDengine 建议将不同数据特征的表创建在不同的库里,因为每个库可以配置不同的存储策略。创建一个库时,除 SQL 标准的选项外,还可以指定保留时长、副本数、缓存大小、时间精度、文件块里最大最小记录条数、是否压缩、一个数据文件覆盖的天数等多种参数。比如: ```sql -CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 6 UPDATE 1; +CREATE DATABASE power KEEP 365 DURATION 10 BUFFER 16 VGROUPS 100 WAL 1; ``` -上述语句将创建一个名为 power 的库,这个库的数据将保留 365 天(超过 365 天将被自动删除),每 10 天一个数据文件,内存块数为 6,允许更新数据。详细的语法及参数请见 [数据库管理](/taos-sql/database) 章节。 +上述语句将创建一个名为 power 的库,这个库的数据将保留 365 天(超过 365 天将被自动删除),每 10 天一个数据文件,每个 VNODE 的写入内存池的大小为 16 MB,数据库的 VGROUPS 数量,对该数据库入会写 WAL 但不执行 FSYNC。详细的语法及参数请见 [数据库管理](/taos-sql/database) 章节。 创建库之后,需要使用 SQL 命令 `USE` 将当前库切换过来,例如: @@ -27,7 +27,6 @@ USE power; :::note - 任何一张表或超级表必须属于某个库,在创建表之前,必须先创建库。 -- 处于两个不同库的表是不能进行 JOIN 操作的。 - 创建并插入记录、查询历史记录的时候,均需要指定时间戳。 ::: @@ -40,15 +39,11 @@ USE power; CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int); ``` -:::note -这一指令中的 STABLE 关键字,在 2.0.15 之前的版本中需写作 TABLE 。 -::: - 与创建普通表一样,创建超级表时,需要提供表名(示例中为 meters),表结构 Schema,即数据列的定义。第一列必须为时间戳(示例中为 ts),其他列为采集的物理量(示例中为 current, voltage, phase),数据类型可以为整型、浮点型、字符串等。除此之外,还需要提供标签的 schema (示例中为 location, groupId),标签的数据类型可以为整型、浮点型、字符串等。采集点的静态属性往往可以作为标签,比如采集点的地理位置、设备型号、设备组 ID、管理员 ID 等等。标签的 schema 可以事后增加、删除、修改。具体定义以及细节请见 [TAOS SQL 的超级表管理](/taos-sql/stable) 章节。 每一种类型的数据采集点需要建立一个超级表,因此一个物联网系统,往往会有多个超级表。对于电网,我们就需要对智能电表、变压器、母线、开关等都建立一个超级表。在物联网中,一个设备就可能有多个数据采集点(比如一台风力发电的风机,有的采集点采集电流、电压等电参数,有的采集点采集温度、湿度、风向等环境参数),这个时候,对这一类型的设备,需要建立多张超级表。 -一张超级表最多容许 4096 列 (在 2.1.7.0 版本之前,列数限制为 1024 列),如果一个采集点采集的物理量个数超过 4096,需要建多张超级表来处理。一个系统可以有多个 DB,一个 DB 里可以有一到多个超级表。 +一张超级表最多容许 4096 列,如果一个采集点采集的物理量个数超过 4096,需要建多张超级表来处理。一个系统可以有多个 DB,一个 DB 里可以有一到多个超级表。 ## 创建表 @@ -60,11 +55,6 @@ CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2); 其中 d1001 是表名,meters 是超级表的表名,后面紧跟标签 Location 的具体标签值 "California.SanFrancisco",标签 groupId 的具体标签值 2。虽然在创建表时,需要指定标签值,但可以事后修改。详细细则请见 [TAOS SQL 的表管理](/taos-sql/table) 章节。 -:::warning -目前 TDengine 没有从技术层面限制使用一个 database (db1) 的超级表作为模板建立另一个 database (db2) 的子表,后续会禁止这种用法,不建议使用这种方法建表。 - -::: - TDengine 建议将数据采集点的全局唯一 ID 作为表名(比如设备序列号)。但对于有的场景,并没有唯一的 ID,可以将多个 ID 组合成一个唯一的 ID。不建议将具有唯一性的 ID 作为标签值。 ### 自动建表 diff --git a/include/util/thash.h b/include/util/thash.h index fc8785a8fb51ac781355b4e10d9f1dec63a1732e..781c22a56aaba0d449d1f711b32fe4bd75a39003 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -188,7 +188,7 @@ void *taosHashGetKey(void *data, size_t* keyLen); void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen); /** - * release the prevous acquired obj + * release the previous acquired obj * * @param pHashObj * @param data diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index d5b719dfb974816dfce1f8085ba4cddcbaec3600..e767d94ebd9eba24f41cd10e2b1908b95dba37ed 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -67,7 +67,6 @@ struct SRSmaStat { int64_t submitVer; int64_t refId; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks - int8_t runningStat; // for persistence task SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; @@ -83,7 +82,6 @@ struct SSmaStat { #define SMA_RSMA_STAT(s) (&(s)->rsmaStat) #define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) -#define RSMA_RUNNING_STAT(r) (&(r)->runningStat) #define RSMA_REF_ID(r) ((r)->refId) #define RSMA_SUBMIT_VER(r) ((r)->submitVer) @@ -93,7 +91,7 @@ enum { TASK_TRIGGER_STAT_INACTIVE = 2, TASK_TRIGGER_STAT_PAUSED = 3, TASK_TRIGGER_STAT_CANCELLED = 4, - TASK_TRIGGER_STAT_FINISHED = 5, + TASK_TRIGGER_STAT_DROPPED = 5, }; void tdDestroySmaEnv(SSmaEnv *pSmaEnv); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 57595a37d10bb1ea179bcab1ec00e94a0bc9e2e4..7b298ba830a348f280032fc36415cbb7f496bcae 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -172,8 +172,9 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level); -int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq); +int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); +int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore); void tdUidStoreDestory(STbUidStore* pStore); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 5eb96653263da70a48383b702cc9ee89a4c4826a..2cf4fd51a96130966f5f0d0957247bb67d3aff64 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -254,26 +254,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { // step 1: set rsma trigger stat cancelled atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); - // step 2: wait the persistence thread to finish - int32_t nLoops = 0; - if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) { - while (1) { - if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) { - smaDebug("vgId:%d, rsma persist task finished already", SMA_VID(pSma)); - break; - } else { - smaDebug("vgId:%d, rsma persist task not finished yet since rsma stat in %" PRIi8, SMA_VID(pSma), - atomic_load_8(RSMA_TRIGGER_STAT(pStat))); - } - ++nLoops; - if (nLoops > 1000) { - sched_yield(); - nLoops = 0; - } - } - } - - // step 3: destroy the rsma info and associated fetch tasks + // step 2: destroy the rsma info and associated fetch tasks // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) { void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL); @@ -285,8 +266,8 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 5: wait all triggered fetch tasks finished - nLoops = 0; + // step 3: wait all triggered fetch tasks finished + int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index efa2886e48bdbd97e4953a918e9e23bf922288f1..14497c6f9bed80af4b8bddd295a0300bab27c46a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -37,8 +37,6 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, int8_t blkType); static void tdRSmaFetchTrigger(void *param, void *tmrId); -static void tdRSmaPersistTrigger(void *param, void *tmrId); -static void *tdRSmaPersistExec(void *param); static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); @@ -68,8 +66,8 @@ struct SRSmaInfo { static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) { // adapt accordingly if definition of SRSmaInfo update int32_t rsmaInfoHeadLen = sizeof(int64_t) + sizeof(STSchema *); - ASSERT(pItem->level == 1 || pItem->level == 2); - return (SRSmaInfo *)POINTER_SHIFT(pItem, -sizeof(SRSmaInfoItem) * (pItem->level - 1) - rsmaInfoHeadLen); + ASSERT(pItem->level == 0 || pItem->level == 1); + return (SRSmaInfo *)POINTER_SHIFT(pItem, -sizeof(SRSmaInfoItem) * pItem->level - rsmaInfoHeadLen); } struct SRSmaQTaskInfoItem { @@ -278,7 +276,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; } - pItem->level = (idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2); + pItem->level = idx; smaInfo("vgId:%d table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, SMA_VID(pSma), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); @@ -375,20 +373,48 @@ _err: /** * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently * - * @param pVnode + * @param pSma * @param pReq * @return int32_t */ -int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { - SSma *pSma = pVnode->pSma; +int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) { + SVnode *pVnode = pSma->pVnode; if (!pReq->rollup) { - smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); + smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name, + pReq->suid); + return TSDB_CODE_SUCCESS; + } + + if (!VND_IS_RSMA(pVnode)) { + smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name, + pReq->suid); return TSDB_CODE_SUCCESS; } return tdProcessRSmaCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name); } +/** + * @brief drop cache for stb + * + * @param pSma + * @param pReq + * @return int32_t + */ +int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { + SVnode *pVnode = pSma->pVnode; + if (!VND_IS_RSMA(pVnode)) { + smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name, + pReq->suid); + return TSDB_CODE_SUCCESS; + } + + + + smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid); + return TSDB_CODE_SUCCESS; + } + /** * @brief store suid/[uids], prefer to use array and then hash * @@ -1174,123 +1200,6 @@ _err: return TSDB_CODE_FAILED; } -static void *tdRSmaPersistExec(void *param) { - setThreadName("rsma-task-persist"); - SRSmaStat *pRSmaStat = param; - SSma *pSma = pRSmaStat->pSma; - - int8_t triggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)); - - if (TASK_TRIGGER_STAT_CANCELLED == triggerStat || TASK_TRIGGER_STAT_PAUSED == triggerStat) { - goto _end; - } - - // execution - tdRSmaPersistExecImpl(pRSmaStat); - -_end: - if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_INACTIVE, - TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("vgId:%d, rsma persist task is active again", SMA_VID(pSma)); - } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, rsma persist task is cancelled", SMA_VID(pSma)); - } else { - smaWarn("vgId:%d, rsma persist task in stat %" PRIi8, SMA_VID(pSma), atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); - } - - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), smaMgmt.rsetId, pRSmaStat->refId); - tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__); - taosThreadExit(NULL); - return NULL; -} - -static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { - TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); - TdThread tid; - - if (taosThreadCreate(&tid, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) { - if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_INACTIVE, - TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("vgId:%d, persist task is active again", SMA_VID(pRSmaStat->pSma)); - } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma)); - } else { - smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, SMA_VID(pRSmaStat->pSma), - atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); - } - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d)", SMA_VID(pRSmaStat->pSma), smaMgmt.rsetId, - pRSmaStat->refId); - tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__); - } - - taosThreadAttrDestroy(&thAttr); -} - -/** - * @brief trigger to persist rsma qTaskInfo - * - * @param param - * @param tmrId - */ -static void tdRSmaPersistTrigger(void *param, void *tmrId) { - SRSmaStat *rsmaStat = param; - SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.rsetId, rsmaStat->refId); - ASSERT(0); - if (!pRSmaStat) { - smaDebug("rsma persistence task not start since already destroyed"); - return; - } - - int8_t tmrStat = - atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); - switch (tmrStat) { - case TASK_TRIGGER_STAT_ACTIVE: { - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 1); - if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, rsma persistence start since active", SMA_VID(pRSmaStat->pSma)); - - // start persist task - tdRSmaPersistTask(pRSmaStat); - - // taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle, - // RSMA_TMR_ID(pRSmaStat)); - } else { - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - } - return; - } break; - case TASK_TRIGGER_STAT_CANCELLED: { - atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED); - smaDebug("rsma persistence not start since cancelled and finished"); - } break; - case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("rsma persistence not start since paused"); - } break; - case TASK_TRIGGER_STAT_INACTIVE: { - smaDebug("rsma persistence not start since inactive"); - } break; - case TASK_TRIGGER_STAT_INIT: { - smaDebug("rsma persistence not start since init"); - } break; - default: { - smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); - } break; - } - taosReleaseRef(smaMgmt.rsetId, rsmaStat->refId); -} - /** * @brief trigger to get rsma result * @@ -1314,8 +1223,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat)); switch (rsmaTriggerStat) { case TASK_TRIGGER_STAT_PAUSED: - case TASK_TRIGGER_STAT_CANCELLED: - case TASK_TRIGGER_STAT_FINISHED: { + case TASK_TRIGGER_STAT_CANCELLED: { tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 " refId:%d", @@ -1328,7 +1236,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); - ASSERT(pRSmaInfo->suid > 0); + ASSERT(pRSmaInfo->items[pItem->level].level == pItem->level); int8_t fetchTriggerStat = atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 214dcc05ccf45b114dcf25da71ac8787a63bea98..605f4bf3d59bef465b523ecccf6114ad11f24fef 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -472,6 +472,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { case SFSNEXTROW_FILESET: { SDFileSet *pFileSet = NULL; + _next_fileset: if (--state->iFileSet >= 0) { pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); } else { @@ -508,6 +509,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ); if (code) goto _err; + if (!state->pBlockIdx) { + goto _next_fileset; + } + tMapDataReset(&state->blockMap); code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap, NULL); /* code = tsdbReadBlock(state->pDataFReader, &state->blockIdx, &state->blockMap, NULL); */ diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c55edc4942e571ee9831a25c1622095b42e384ae..d25ae817c7a700217d066ede669cb28a8b5da845 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -417,7 +417,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p goto _err; } - if (tdProcessRSmaCreate(pVnode, &req) < 0) { + if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) { pRsp->code = terrno; goto _err; } @@ -573,6 +573,11 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe goto _exit; } + if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) { + rcode = terrno; + goto _exit; + } + // return rsp _exit: pRsp->code = rcode; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c new file mode 100644 index 0000000000000000000000000000000000000000..b6ad3b6cc0555c6d029f38fc5ceb6bce70af4b96 --- /dev/null +++ b/source/libs/executor/src/cachescanoperator.c @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "function.h" +#include "tname.h" + +#include "tdatablock.h" +#include "tmsg.h" + +#include "executorimpl.h" +#include "tcompare.h" +#include "thash.h" +#include "ttypes.h" +#include "executorInt.h" + +static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); +static void destroyLastrowScanOperator(void* param, int32_t numOfOutput); +static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); + +SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList, + SExecTaskInfo* pTaskInfo) { + SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->pTableList = pTableList; + pInfo->readHandle = *readHandle; + pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc); + + int32_t numOfCols = 0; + pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols, + COL_MATCH_FROM_COL_ID); + int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t)); + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i); + pCols[i] = pColMatch->colId; + } + + int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, pCols, numOfCols, + &pInfo->pLastrowReader); + taosMemoryFree(pCols); + + pOperator->name = "LastrowScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); + + initResultSizeInfo(pOperator, 1024); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); + pOperator->cost.openCost = 0; + return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} + +SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SLastrowScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + int32_t size = taosArrayGetSize(pInfo->pTableList); + if (size == 0) { + setTaskStatus(pTaskInfo, TASK_COMPLETED); + return NULL; + } + + // check if it is a group by tbname + if (size == taosArrayGetSize(pInfo->pTableList)) { + blockDataCleanup(pInfo->pRes); + tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds); + return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; + } else { + // todo fetch the result for each group + } + + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; +} + +void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { + SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; + blockDataDestroy(pInfo->pRes); + tsdbLastrowReaderClose(pInfo->pLastrowReader); + + taosMemoryFreeClear(param); +} + +int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) { + size_t numOfCols = taosArrayGetSize(pColMatchInfo); + + *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); + if (*pSlotIds == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i); + for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) { + if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId && + pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + (*pSlotIds)[pColMatch->targetSlotId] = -1; + break; + } + + if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) { + (*pSlotIds)[pColMatch->targetSlotId] = j; + break; + } + } + } + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index e90be75743cb5650c1bb2fa4ba05f34c52aa3d65..4d4756c3ace3c7e0e2c8afa3fd2530984ecc349b 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -162,12 +162,11 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols } if (NULL == pScanCols) { - // select count(*) from t return NULL == pScanPseudoCols ? SCAN_TYPE_TABLE : ((FUNCTION_TYPE_BLOCK_DIST_INFO == ((SFunctionNode*)nodesListGetNode(pScanPseudoCols, 0))->funcType) ? SCAN_TYPE_BLOCK_INFO - : SCAN_TYPE_TAG); + : SCAN_TYPE_TABLE); } if (TSDB_SYSTEM_TABLE == tableType) { @@ -181,7 +180,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols } } - return SCAN_TYPE_TAG; + return SCAN_TYPE_TABLE; } static SNode* createPrimaryKeyCol(uint64_t tableId) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 2257ba8328869f09c4aa9975d975c0e3ea5af643..1b893739bd846f8e94911c03bbd562d13d6a0370 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -35,7 +35,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } + } if (taskStatus != JOB_TASK_STATUS_PART_SUCC) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); @@ -75,7 +75,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { // Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; - char *msg = pMsg->pData; + char *msg = pMsg->pData; int32_t msgSize = pMsg->len; int32_t msgType = pMsg->msgType; @@ -253,15 +253,15 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa rsp->sversion = ntohl(rsp->sversion); rsp->tversion = ntohl(rsp->tversion); rsp->affectedRows = be64toh(rsp->affectedRows); - + SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); - taosMemoryFreeClear(msg); - + taosMemoryFreeClear(msg); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -375,7 +375,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SSchTask *pTask = NULL; SSchJob *pJob = NULL; - qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); + qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, + tstrerror(rspCode)); SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); @@ -387,7 +388,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(param); - qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); + qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, + tstrerror(rspCode)); SCH_RET(code); } @@ -424,7 +426,7 @@ int32_t schHandleCommitCallback(void *param, SDataBuf *pMsg, int32_t code) { } int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { - SSchedulerHbRsp rsp = {0}; + SSchedulerHbRsp rsp = {0}; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; if (code) { @@ -453,8 +455,8 @@ _return: SCH_RET(code); } - -int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, void **pParam) { +int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, + void **pParam) { if (!isHb) { SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam)); if (NULL == param) { @@ -940,7 +942,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; - SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx); + SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse, + addr->epSet.numOfEps); } switch (msgType) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index a6621d279d10ce72cfb4a5dad2945d9b71aa9b1f..4275bea3f092a4a07d01c8f7c81fa4f1b42cd343 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -21,11 +21,9 @@ #include "tref.h" #include "trpc.h" - - void schFreeTask(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); - + if (pTask->candidateAddrs) { taosArrayDestroy(pTask->candidateAddrs); } @@ -45,17 +43,17 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { } } - int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) { int32_t code = 0; - + pTask->plan = pPlan; pTask->level = pLevel; pTask->execId = -1; pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES(pLevel->level, levelNum); pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->taskId = schGenTaskId(); - pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + pTask->execNodes = + taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTask->profile.execTime = taosMemoryCalloc(pTask->maxExecTimes, sizeof(int64_t)); if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -110,8 +108,8 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ } else { SCH_TASK_DLOG("execId %d removed from execNodeList", execId); } - - if (execId != pTask->execId) { // ignore it + + if (execId != pTask->execId) { // ignore it SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } @@ -149,13 +147,13 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { return TSDB_CODE_SCH_IGNORE_ERROR; } - + int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(status)); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } - + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { SCH_TASK_ELOG("task already not in EXEC status, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); @@ -204,8 +202,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_RET(errCode); } - - // Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; @@ -265,13 +261,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); SCH_LOCK_TASK(parent); - SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, - .taskId = pTask->taskId, - .schedId = schMgmt.sId, - .execId = pTask->execId, - .addr = pTask->succeedAddr, - .fetchMsgType = SCH_FETCH_TYPE(pTask), - }; + SDownstreamSourceNode source = { + .type = QUERY_NODE_DOWNSTREAM_SOURCE, + .taskId = pTask->taskId, + .schedId = schMgmt.sId, + .execId = pTask->execId, + .addr = pTask->succeedAddr, + .fetchMsgType = SCH_FETCH_TYPE(pTask), + }; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK_TASK(parent); @@ -291,29 +288,29 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && - pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { + if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status && pJob->fetchTask != pTask && + taosArrayGetSize(pTask->candidateAddrs) > 1) { SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId); schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); - + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR)); } return TSDB_CODE_SUCCESS; } -int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { +int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; - + if ((pTask->execId + 1) >= pTask->maxExecTimes) { SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&rspCode); + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void *)&rspCode); return TSDB_CODE_SUCCESS; } SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); - + schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask)); @@ -328,25 +325,24 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 if (pData) { SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); } - + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) { SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } - } + } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); - + SCH_ERR_JRET(schLaunchTask(pJob, pTask)); return TSDB_CODE_SUCCESS; } - // merge plan - + pTask->childReady = 0; - + qClearSubplanExecutionNode(pTask->plan); // Note: current error task and upper level merge task @@ -355,10 +351,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); - + int32_t childrenNum = taosArrayGetSize(pTask->children); for (int32_t i = 0; i < childrenNum; ++i) { - SSchTask* pChild = taosArrayGetP(pTask->children, i); + SSchTask *pChild = taosArrayGetP(pTask->children, i); SCH_LOCK_TASK(pChild); schDoTaskRedirect(pJob, pChild, NULL, rspCode); SCH_UNLOCK_TASK(pChild); @@ -371,7 +367,7 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } -int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { +int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; if (SCH_IS_DATA_BIND_TASK(pTask)) { @@ -537,7 +533,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); - + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } @@ -545,7 +541,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); if (SCH_IS_DATA_BIND_TASK(pTask)) { - SCH_SWITCH_EPSET(&pTask->plan->execNode); + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SCH_SWITCH_EPSET(addr); } else { SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask)); } @@ -558,20 +555,21 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { int32_t addNum = 0; int32_t nodeNum = 0; - + if (pJob->nodeList) { nodeNum = taosArrayGetSize(pJob->nodeList); for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i); SQueryNodeAddr *naddr = &nload->addr; - + if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); + SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, + SCH_GET_CUR_EP(naddr)->port); ++addNum; } @@ -585,7 +583,6 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (NULL != pTask->candidateAddrs) { return TSDB_CODE_SUCCESS; @@ -628,16 +625,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet) { +int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) { if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) { - SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0)); + SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", + (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0)); SCH_ERR_RET(TSDB_CODE_APP_ERROR); } - SQueryNodeAddr* pAddr = taosArrayGet(pTask->candidateAddrs, 0); + SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0); - SEp* pOld = &pAddr->epSet.eps[pAddr->epSet.inUse]; - SEp* pNew = &pEpSet->eps[pEpSet->inUse]; + SEp *pOld = &pAddr->epSet.eps[pAddr->epSet.inUse]; + SEp *pNew = &pEpSet->eps[pEpSet->inUse]; SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port); @@ -647,7 +645,7 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSe } int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { - int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); + int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); if (++pTask->candidateIdx >= candidateNum) { pTask->candidateIdx = 0; } @@ -655,8 +653,6 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - - int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) { int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId)); if (code) { @@ -692,33 +688,32 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_DLOG("task has been dropped on %d exec nodes", size); } - - -int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { - int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); +int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { + int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); SSchTask *pTask = NULL; - SSchJob *pJob = NULL; + SSchJob *pJob = NULL; - qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port); + qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, + pEpId->ep.port); for (int32_t i = 0; i < taskNum; ++i) { STaskStatus *pStatus = taosArrayGet(pStatusList, i); - int32_t code = 0; - - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", - pStatus->queryId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status)); + int32_t code = 0; + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId, + pStatus->execId, jobTaskStatusStr(pStatus->status)); if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) { continue; } if (pStatus->execId != pTask->execId) { - //TODO + // TODO SCH_TASK_DLOG("execId %d mis-match current execId %d", pStatus->execId, pTask->execId); schProcessOnCbEnd(pJob, pTask, 0); continue; } - + if (pStatus->status == JOB_TASK_STATUS_FAIL) { // RECORD AND HANDLE ERROR!!!! schProcessOnCbEnd(pJob, pTask, 0); @@ -832,7 +827,6 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { } } - // Note: no more error processing, handled in function internal int32_t schLaunchFetchTask(SSchJob *pJob) { int32_t code = 0; @@ -851,5 +845,3 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); } - - diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 9742f93824120f4f6ca21401c695de7f2524d240..e361c8021cf166818d4ffefd5888ffa6afce06b2 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -251,6 +251,9 @@ void syncStartStandBy(int64_t rid); bool syncNodeCanChange(SSyncNode* pSyncNode); bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); +int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); +int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 50e2588e193063ee68beb57e175ca69ed83f43b8..e1c3d4bb3322339e2afe5d8ee2679522dad621fd 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -316,6 +316,40 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { return ret; } +int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { + if (pSyncNode->peersNum == 0) { + sError("only one replica, cannot leader transfer"); + terrno = TSDB_CODE_SYN_ONE_REPLICA; + return -1; + } + + SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; + int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + return ret; +} + +int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { + int32_t ret = 0; + + if (pSyncNode->replicaNum == 1) { + sError("only one replica, cannot leader transfer"); + terrno = TSDB_CODE_SYN_ONE_REPLICA; + return -1; + } + + SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); + pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); + pMsg->newLeaderId.vgId = pSyncNode->vgId; + pMsg->newNodeInfo = newLeader; + ASSERT(pMsg != NULL); + SRpcMsg rpcMsg = {0}; + syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); + syncLeaderTransferDestroy(pMsg); + + ret = syncNodePropose(pSyncNode, &rpcMsg, false); + return ret; +} + bool syncCanLeaderTransfer(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -1113,6 +1147,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "sync close"); + // leader transfer + int32_t ret; ASSERT(pSyncNode != NULL); @@ -1527,8 +1563,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 - ", lastsnapshot:%" PRId64 + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 + ", snapshot:%" PRId64 ", standby:%d, " "strategy:%d, batch:%d, " "replica-num:%d, " @@ -1548,8 +1584,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 - ", lastsnapshot:%" PRId64 + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 + ", snapshot:%" PRId64 ", standby:%d, " "strategy:%d, batch:%d, " "replica-num:%d, " @@ -1594,8 +1630,8 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 - ", lastsnapshot:%" PRId64 + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 + ", snapshot:%" PRId64 ", standby:%d, " "replica-num:%d, " "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", @@ -1613,8 +1649,8 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 - ", lastsnapshot:%" PRId64 + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 + ", snapshot:%" PRId64 ", standby:%d, " "replica-num:%d, " "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", @@ -1644,8 +1680,8 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); snprintf(s, len, - "vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 - ", lastsnapshot:%" PRId64 + "vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 + ", snapshot:%" PRId64 ", standby:%d, " "replica-num:%d, " "lconfig:%" PRId64 ", changing:%d, restore:%d", diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 3ab15ad804c943482648e43398e5fe2ea224067e..84af8da5134629856ef1d7757c00755dba512250 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -294,7 +294,7 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) { } void transReqQueueInit(queue* q) { - // init req queue + // init req queue QUEUE_INIT(q); } void* transReqQueuePushReq(queue* q) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index fbf6c0df7644b3669570cb1c02832ff4b7bd316b..68e12a19639d5731ab1e091e8bbbb40c7930968f 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -316,7 +316,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { memset(&conn->regArg, 0, sizeof(conn->regArg)); } } - transUnrefSrvHandle(conn); + destroyConn(conn, true); } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -434,7 +434,6 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { uvPrepareSendData(smsg, &wb); transRefSrvHandle(pConn); - uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } @@ -780,9 +779,6 @@ static void destroyConn(SSvrConn* conn, bool clear) { tTrace("conn %p to be destroyed", conn); uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } - //} else { - // uvDestroyConn((uv_handle_t*)conn->pTcp); - //} } } static void destroyConnRegArg(SSvrConn* conn) { diff --git a/tests/script/tsim/sync/vnodesnapshot-test.sim b/tests/script/tsim/sync/vnodesnapshot-test.sim index e4ef6739dd99f3072dad3eb10876b12de2922685..3cf8cb4d9379ef137421b949ea57a1f85dd65ff4 100644 --- a/tests/script/tsim/sync/vnodesnapshot-test.sim +++ b/tests/script/tsim/sync/vnodesnapshot-test.sim @@ -168,11 +168,103 @@ system sh/exec.sh -n dnode3 -s stop -x SIGINT - +######################################################## print ===> start dnode1 dnode2 dnode3 dnode4 system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode4 -s start +sleep 3000 + +print =============== query data +sql connect +sql use db +sql select * from ct1 +print rows: $rows +print $data00 $data01 $data02 +if $rows != 100 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +######################################################## + + +######################################################## +print ===> start dnode1 dnode3 dnode4 +system sh/exec.sh -n dnode1 -s start +#system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +sleep 7000 + +print =============== query data +sql connect +sql use db +sql select * from ct1 +print rows: $rows +print $data00 $data01 $data02 +if $rows != 100 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +#system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +######################################################## + + +######################################################## +print ===> start dnode1 dnode2 dnode4 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +#system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +sleep 3000 + +print =============== query data +sql select * from ct1 +print rows: $rows +print $data00 $data01 $data02 +if $rows != 100 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +#system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +######################################################## + + +######################################################## +print ===> start dnode1 dnode2 dnode3 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +#system sh/exec.sh -n dnode4 -s start + +sleep 3000 + +print =============== query data +sql select * from ct1 +print rows: $rows +print $data00 $data01 $data02 +if $rows != 100 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +#system sh/exec.sh -n dnode4 -s stop -x SIGINT +######################################################## + diff --git a/tests/script/tsim/valgrind/checkError2.sim b/tests/script/tsim/valgrind/checkError2.sim index e9dfc0eb4e650e9b9a5341cbcdd892a00c92a8c6..fdac687224fa464e3131c25d5a8d63bab9a3e79d 100644 --- a/tests/script/tsim/valgrind/checkError2.sim +++ b/tests/script/tsim/valgrind/checkError2.sim @@ -48,7 +48,7 @@ sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s print =============== step6: select data sql select * from ct1 -#sql select * from stb +sql select * from stb _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/2-query/abs.py b/tests/system-test/2-query/abs.py index 5e5bb0df3c5b18e146925288d9cb047a73080831..7ebb2eba8cb4f6b61128e8b26e4af93abbe023c5 100644 --- a/tests/system-test/2-query/abs.py +++ b/tests/system-test/2-query/abs.py @@ -538,9 +538,9 @@ class TDTestCase: tdSql.query("select c1 ,t1 from stb1 where t1 =0 ") tdSql.checkRows(13) tdSql.query("select t1 from stb1 where t1 >0 ") - tdSql.checkRows(3) + tdSql.checkRows(12) tdSql.query("select t1 from stb1 where t1 =3 ") - tdSql.checkRows(1) + tdSql.checkRows(12) # tdSql.query("select sum(t1) from (select c1 ,t1 from stb1)") # tdSql.checkData(0,0,61) # tdSql.query("select distinct(c1) ,t1 from stb1") @@ -550,7 +550,7 @@ class TDTestCase: # tag filter with abs function tdSql.query("select t1 from stb1 where abs(t1)=1") - tdSql.checkRows(1) + tdSql.checkRows(0) tdSql.query("select t1 from stb1 where abs(c1+t1)=1") tdSql.checkRows(1) tdSql.checkData(0,0,0) diff --git a/tests/system-test/2-query/and_or_for_byte.py b/tests/system-test/2-query/and_or_for_byte.py index aab9cce0408bbe62d4170ce0e385e110e72aff36..416e62c0f28a1a0661a1c3fd0e9c92da0f4a30fb 100644 --- a/tests/system-test/2-query/and_or_for_byte.py +++ b/tests/system-test/2-query/and_or_for_byte.py @@ -495,9 +495,9 @@ class TDTestCase: tdSql.checkRows(13) self.check_function("&", False ,"t1","c1+2","abs(c2)") tdSql.query("select t1 from stb1 where t1 >0 ") - tdSql.checkRows(3) + tdSql.checkRows(12) tdSql.query("select t1 from stb1 where t1 =3 ") - tdSql.checkRows(1) + tdSql.checkRows(12) # tdSql.query("select sum(t1) from (select c1 ,t1 from stb1)") # tdSql.checkData(0,0,61) # tdSql.query("select distinct(c1) ,t1 from stb1") @@ -507,7 +507,7 @@ class TDTestCase: # tag filter with abs function tdSql.query("select t1 from stb1 where abs(t1)=1") - tdSql.checkRows(1) + tdSql.checkRows(0) tdSql.query("select t1 from stb1 where abs(c1+t1)=1") tdSql.checkRows(1) tdSql.checkData(0,0,0) diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index f66f0f67ab03ce1c9c87ebc54dc2f879954c07bc..70aca9fd93b819d8ceae0f25e1081966589bb519 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -57,7 +57,7 @@ class TDTestCase: # test duplicate key using the first one. elimate empty key tdSql.execute("CREATE TABLE if not exists jsons1_8 using jsons1 tags('{\"tag1\":null, \"tag1\":true, \"tag1\":45, \"1tag$\":2, \" \":90, \"\":32}')") tdSql.query("select jtag from jsons1_8") - tdSql.checkData(0, 0, '{" ":90,"1tag$":2,"tag1":null}') + tdSql.checkRows(0); tdSql.query("select ts,jtag from jsons1 order by ts limit 2,3") tdSql.checkData(0, 0, '2020-06-02 09:17:08.000') @@ -153,38 +153,17 @@ class TDTestCase: #test scalar operation tdSql.query("select jtag contains 'tag1',jtag->'tag1' from jsons1 order by jtag->'tag1'") - tdSql.checkRows(13) - tdSql.checkData(0, 0, False) - tdSql.checkData(5, 0, True) - tdSql.checkData(12, 0, True) + tdSql.checkRows(9) tdSql.query("select jtag->'tag1' like 'fe%',jtag->'tag1' from jsons1 order by jtag->'tag1'") - tdSql.checkRows(13) - tdSql.checkData(10, 0, False) - tdSql.checkData(11, 0, False) - tdSql.checkData(12, 0, True) + tdSql.checkRows(9) tdSql.query("select jtag->'tag1' not like 'fe%',jtag->'tag1' from jsons1 order by jtag->'tag1'") - tdSql.checkRows(13) - tdSql.checkData(10, 0, False) - tdSql.checkData(11, 0, True) - tdSql.checkData(12, 0, False) + tdSql.checkRows(9) tdSql.query("select jtag->'tag1' match 'fe',jtag->'tag1' from jsons1 order by jtag->'tag1'") - tdSql.checkRows(13) - tdSql.checkData(10, 0, False) - tdSql.checkData(11, 0, False) - tdSql.checkData(12, 0, True) + tdSql.checkRows(9) tdSql.query("select jtag->'tag1' nmatch 'fe',jtag->'tag1' from jsons1 order by jtag->'tag1'") - tdSql.checkRows(13) - tdSql.checkData(10, 0, False) - tdSql.checkData(11, 0, True) - tdSql.checkData(12, 0, False) + tdSql.checkRows(9) tdSql.query("select jtag->'tag1',jtag->'tag1'>='a' from jsons1 order by jtag->'tag1'") - tdSql.checkRows(13) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, False) - tdSql.checkData(7, 0, "false") - tdSql.checkData(7, 1, False) - tdSql.checkData(8, 1, False) - tdSql.checkData(12, 1, True) + tdSql.checkRows(9) # test select normal column tdSql.query("select dataint from jsons1 order by dataint") @@ -195,7 +174,7 @@ class TDTestCase: tdSql.query("select * from jsons1") tdSql.checkRows(9) tdSql.query("select jtag from jsons1") - tdSql.checkRows(13) + tdSql.checkRows(9) tdSql.query("select * from jsons1 where jtag is null") tdSql.checkRows(1) tdSql.query("select * from jsons1 where jtag is not null") @@ -227,7 +206,7 @@ class TDTestCase: tdSql.checkData(0, 0, None) tdSql.query("select jtag->'tag1' from jsons1") - tdSql.checkRows(13) + tdSql.checkRows(9) # test header name res = tdSql.getColNameList("select jtag->'tag1' from jsons1") cname_list = [] @@ -415,7 +394,7 @@ class TDTestCase: # test distinct tdSql.execute("insert into jsons1_14 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":null}') values(1591062628000, 2, NULL, '你就会', 'dws')") tdSql.query("select distinct jtag->'tag1' from jsons1") - tdSql.checkRows(8) + tdSql.checkRows(7) # tdSql.query("select distinct jtag from jsons1") # tdSql.checkRows(9) @@ -523,12 +502,12 @@ class TDTestCase: # union all tdSql.query("select jtag->'tag1' from jsons1 union all select jtag->'tag2' from jsons2") - tdSql.checkRows(17) + tdSql.checkRows(13) tdSql.query("select jtag->'tag1' from jsons1_1 union all select jtag->'tag2' from jsons2_1") - tdSql.checkRows(2) + tdSql.checkRows(3) tdSql.query("select jtag->'tag1' from jsons1_1 union all select jtag->'tag1' from jsons2_1") - tdSql.checkRows(2) + tdSql.checkRows(3) tdSql.query("select dataint,jtag->'tag1',tbname from jsons1 union all select dataint,jtag->'tag1',tbname from jsons2") tdSql.checkRows(13) tdSql.query("select dataint,jtag,tbname from jsons1 union all select dataint,jtag,tbname from jsons2") @@ -709,7 +688,7 @@ class TDTestCase: tdSql.checkData(0, 0, None) tdSql.execute("CREATE TABLE if not exists jsons1_20 using jsons1 tags(NULL)") tdSql.query("select jtag from jsons1_20") - tdSql.checkData(0, 0, None) + tdSql.checkRows(0) tdSql.execute("insert into jsons1_21 using jsons1 tags(NULL) values(1591061628000, 11, false, '你就会','')") tdSql.query("select jtag from jsons1_21") tdSql.checkData(0, 0, None) diff --git a/tests/system-test/7-tmq/tmqAutoCreateTbl.py b/tests/system-test/7-tmq/tmqAutoCreateTbl.py index db2043de61fccbc1255e41ccbcabb266034b57a2..6a9f10ebbf6c13cce75a36a83c3174bd117d2a34 100644 --- a/tests/system-test/7-tmq/tmqAutoCreateTbl.py +++ b/tests/system-test/7-tmq/tmqAutoCreateTbl.py @@ -16,7 +16,7 @@ from tmqCommon import * class TDTestCase: def __init__(self): - self.vgroups = 1 + self.vgroups = 2 self.ctbNum = 100 self.rowsPerTbl = 10000 @@ -29,7 +29,7 @@ class TDTestCase: paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', - 'vgroups': 1, + 'vgroups': 3, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', @@ -37,9 +37,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 100000, - 'batchNum': 100, + 'ctbNum': 500, + 'rowsPerTbl': 1000, + 'batchNum': 500, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 3, 'showMsg': 1, @@ -73,7 +73,7 @@ class TDTestCase: paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', - 'vgroups': 1, + 'vgroups': 4, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', @@ -81,18 +81,18 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 10000, - 'batchNum': 100, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 400, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, - 'snapshot': 0} + 'snapshot': 1} - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl + # paraDict['vgroups'] = self.vgroups + # paraDict['ctbNum'] = self.ctbNum + # paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) @@ -107,9 +107,12 @@ class TDTestCase: startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) - tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName'])) consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] topicList = topicFromStb1 @@ -117,7 +120,7 @@ class TDTestCase: ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:true,\ - auto.commit.interval.ms:1000,\ + auto.commit.interval.ms:500,\ auto.offset.reset:earliest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) @@ -137,9 +140,12 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() - if totalConsumeRows != expectrowcnt: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + if totalConsumeRows != totalRowsInserted: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) @@ -151,7 +157,7 @@ class TDTestCase: paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', - 'vgroups': 1, + 'vgroups': 4, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', @@ -159,18 +165,18 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 10000, - 'batchNum': 100, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 1000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, + 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, 'snapshot': 1} - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl + # paraDict['vgroups'] = self.vgroups + # paraDict['ctbNum'] = self.ctbNum + # paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) @@ -184,9 +190,12 @@ class TDTestCase: ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) - tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName'])) consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicFromStb1 @@ -202,9 +211,8 @@ class TDTestCase: tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tdLog.info("create some new child table and insert data ") - paraDict['batchNum'] = 100 tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) - + tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("================= restart dnode ===========================") tdDnodes.stop(1) @@ -217,9 +225,12 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() - if totalConsumeRows != expectrowcnt: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + if totalConsumeRows != totalRowsInserted: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) @@ -232,7 +243,7 @@ class TDTestCase: paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', - 'vgroups': 1, + 'vgroups': 4, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', @@ -240,14 +251,14 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1, - 'rowsPerTbl': 10000, - 'batchNum': 100, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 400, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, + 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, - 'snapshot': 0} + 'snapshot': 1} paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum @@ -261,9 +272,12 @@ class TDTestCase: tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) - tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName'])) consumerId = 0 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] topicList = topicFromStb1 @@ -289,9 +303,12 @@ class TDTestCase: totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() - if totalConsumeRows != expectrowcnt: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + if totalConsumeRows != totalRowsInserted: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) diff --git a/tests/system-test/99-TDcase/TD-17255.py b/tests/system-test/99-TDcase/TD-17255.py new file mode 100644 index 0000000000000000000000000000000000000000..9eb8d531f796ace5fdbc6e7d3708253ef2bdf06e --- /dev/null +++ b/tests/system-test/99-TDcase/TD-17255.py @@ -0,0 +1,333 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 2 + self.ctbNum = 100 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 3, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 500, + 'rowsPerTbl': 1000, + 'batchNum': 500, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdDnodes.stop(1) + # tdDnodes.start(1) + tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 400, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + # paraDict['vgroups'] = self.vgroups + # paraDict['ctbNum'] = self.ctbNum + # paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:500,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + # time.sleep(3) + tmqCom.getStartCommitNotifyFromTmqsim() + tdLog.info("================= restart dnode ===========================") + tdDnodes.stop(1) + tdDnodes.start(1) + time.sleep(5) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + if totalConsumeRows != totalRowsInserted: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + # paraDict['vgroups'] = self.vgroups + # paraDict['ctbNum'] = self.ctbNum + # paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("create some new child table and insert data ") + tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + + tmqCom.getStartCommitNotifyFromTmqsim() + tdLog.info("================= restart dnode ===========================") + tdDnodes.stop(1) + tdDnodes.start(1) + time.sleep(5) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + if totalConsumeRows != totalRowsInserted: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + # 自动建表完成数据插入,启动消费 + def tmqCase3(self): + tdLog.printNoPrefix("======== test case 3: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 400, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("insert data by auto create ctb") + tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + # tdLog.info("================= restart dnode ===========================") + # tdDnodes.stop(1) + # tdDnodes.start(1) + # time.sleep(2) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + if totalConsumeRows != totalRowsInserted: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + + + def run(self): + tdSql.prepare() + + self.tmqCase1() + # self.tmqCase2() + self.tmqCase3() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())