tmqDropNtb-snapshot0.py 10.8 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

import taos
import sys
import time
import socket
import os
import threading
from enum import Enum

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
    def __init__(self):
        self.snapshot   = 0
        self.vgroups    = 4
        self.ctbNum     = 1000
        self.rowsPerTbl = 10
G
Ganlin Zhao 已提交
23

P
plum-lihui 已提交
24 25 26
    def init(self, conn, logSql):
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor(), False)
G
Ganlin Zhao 已提交
27

P
plum-lihui 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
    # drop some ntbs
    def tmqCase1(self):
        tdLog.printNoPrefix("======== test case 1: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ntb',
                    'ctbStartIdx': 0,
                    'ctbNum':     1000,
                    'rowsPerTbl': 100,
                    'batchNum':   100,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'endTs': 0,
                    'pollDelay':  5,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}
        paraDict['snapshot'] = self.snapshot
        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
G
Ganlin Zhao 已提交
54 55
        paraDict['rowsPerTbl'] = self.rowsPerTbl

P
plum-lihui 已提交
56 57 58 59 60 61 62
        tmqCom.initConsumerTable()
        tdLog.info("start create database....")
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
        tdLog.info("start create normal tables....")
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
63

P
plum-lihui 已提交
64
        tdLog.info("create topics from database")
G
Ganlin Zhao 已提交
65
        topicFromDb = 'topic_dbt'
P
plum-lihui 已提交
66
        tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
G
Ganlin Zhao 已提交
67

P
plum-lihui 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
        if self.snapshot == 0:
            consumerId     = 0
        elif self.snapshot == 1:
            consumerId     = 1

        expectrowcnt   = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
        topicList      = topicFromDb
        ifcheckdata    = 1
        ifManualCommit = 1
        keyList        = 'group.id:cgrp1,\
                        enable.auto.commit:true,\
                        auto.commit.interval.ms:1000,\
                        auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])

G
Ganlin Zhao 已提交
86
        tmqCom.getStartConsumeNotifyFromTmqsim()
P
plum-lihui 已提交
87 88 89 90 91
        tdLog.info("drop some ntables")
        # drop 1/4 ctbls from half offset
        paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
        paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
        tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
G
Ganlin Zhao 已提交
92

P
plum-lihui 已提交
93 94 95 96 97 98 99 100
        tdLog.info("start to check consume result")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]

        tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
G
Ganlin Zhao 已提交
101

P
plum-lihui 已提交
102 103
        if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)):
            tdLog.exit("tmq consume rows error with snapshot = 0!")
G
Ganlin Zhao 已提交
104 105

        tdLog.info("wait subscriptions exit ....")
P
plum-lihui 已提交
106
        tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
G
Ganlin Zhao 已提交
107

P
plum-lihui 已提交
108 109 110 111
        tdSql.query("drop topic %s"%topicFromDb)
        tdLog.info("success dorp topic: %s"%topicFromDb)
        tdLog.printNoPrefix("======== test case 1 end ...... ")

G
Ganlin Zhao 已提交
112 113


P
plum-lihui 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
    # drop some ntbs and create some new ntbs
    def tmqCase2(self):
        tdLog.printNoPrefix("======== test case 2: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ntb',
                    'ctbStartIdx': 0,
                    'ctbNum':     1000,
                    'rowsPerTbl': 100,
                    'batchNum':   100,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'endTs': 0,
                    'pollDelay':  10,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}
        paraDict['snapshot'] = self.snapshot
        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
G
Ganlin Zhao 已提交
140 141
        paraDict['rowsPerTbl'] = self.rowsPerTbl

P
plum-lihui 已提交
142 143 144 145 146 147 148
        tmqCom.initConsumerTable()
        tdLog.info("start create database....")
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
        tdLog.info("start create normal tables....")
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
149

P
plum-lihui 已提交
150
        tdLog.info("create topics from database")
G
Ganlin Zhao 已提交
151
        topicFromDb = 'topic_dbt'
P
plum-lihui 已提交
152
        tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
G
Ganlin Zhao 已提交
153

P
plum-lihui 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
        if self.snapshot == 0:
            consumerId     = 2
        elif self.snapshot == 1:
            consumerId     = 3

        expectrowcnt   = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
        topicList      = topicFromDb
        ifcheckdata    = 1
        ifManualCommit = 1
        keyList        = 'group.id:cgrp1,\
                        enable.auto.commit:true,\
                        auto.commit.interval.ms:1000,\
                        auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])

G
Ganlin Zhao 已提交
172
        tmqCom.getStartConsumeNotifyFromTmqsim()
P
plum-lihui 已提交
173 174 175 176 177
        tdLog.info("drop some ntables")
        # drop 1/4 ctbls from half offset
        paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
        paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
        tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
G
Ganlin Zhao 已提交
178

P
plum-lihui 已提交
179 180 181 182 183 184
        tdLog.info("start create some new normal tables....")
        paraDict["ctbPrefix"] = 'newCtb'
        paraDict["ctbNum"] = self.ctbNum
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into these new normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
185

P
plum-lihui 已提交
186 187 188 189 190 191 192 193
        tdLog.info("start to check consume result")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]

        tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
G
Ganlin Zhao 已提交
194

P
plum-lihui 已提交
195 196
        if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)):
            tdLog.exit("tmq consume rows error with snapshot = 0!")
G
Ganlin Zhao 已提交
197 198

        tdLog.info("wait subscriptions exit ....")
P
plum-lihui 已提交
199
        tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
G
Ganlin Zhao 已提交
200

P
plum-lihui 已提交
201 202 203
        tdSql.query("drop topic %s"%topicFromDb)
        tdLog.info("success dorp topic: %s"%topicFromDb)
        tdLog.printNoPrefix("======== test case 2 end ...... ")
G
Ganlin Zhao 已提交
204 205

    def run(self):
P
plum-lihui 已提交
206 207 208 209
        tdLog.printNoPrefix("=============================================")
        tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
        self.snapshot = 0
        self.tmqCase1()
G
Ganlin Zhao 已提交
210 211
        self.tmqCase2()

P
plum-lihui 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224 225
        # tdLog.printNoPrefix("====================================================================")
        # tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
        # self.snapshot = 1
        # self.tmqCase1()
        # self.tmqCase2()

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())