tmqDropStbCtb.py 13.5 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

import taos
import sys
import time
import socket
import os
import threading
from enum import Enum

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
    def __init__(self):
        self.snapshot   = 0
        self.vgroups    = 4
        self.ctbNum     = 100
        self.rowsPerTbl = 1000
G
Ganlin Zhao 已提交
23

P
plum-lihui 已提交
24 25 26
    def init(self, conn, logSql):
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor(), False)
G
Ganlin Zhao 已提交
27

P
plum-lihui 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
    def prepareTestEnv(self):
        tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbStartIdx': 0,
                    'ctbNum':     100,
                    'rowsPerTbl': 1000,
                    'batchNum':   1000,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'pollDelay':  3,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}

        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
        paraDict['rowsPerTbl'] = self.rowsPerTbl
G
Ganlin Zhao 已提交
53

P
plum-lihui 已提交
54
        # tmqCom.initConsumerTable()
P
plum-lihui 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
        tdLog.info("create stb")
        tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
        tdLog.info("create ctb")
        tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
                             ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
        tdLog.info("insert data")
        tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
                                               ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
                                               startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
        # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
        #                                       ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
        #                                       startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
G
Ganlin Zhao 已提交
68 69

        # tdLog.info("restart taosd to ensure that the data falls into the disk")
P
plum-lihui 已提交
70 71
        # tdSql.query("flush database %s"%(paraDict['dbName']))
        return
G
Ganlin Zhao 已提交
72

P
plum-lihui 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    # drop some ctbs
    def tmqCase1(self):
        tdLog.printNoPrefix("======== test case 1: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbStartIdx': 0,
                    'ctbNum':     100,
                    'rowsPerTbl': 1000,
                    'batchNum':   1000,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'endTs': 0,
                    'pollDelay':  5,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}
        paraDict['snapshot'] = self.snapshot
        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
G
Ganlin Zhao 已提交
99 100
        paraDict['rowsPerTbl'] = self.rowsPerTbl

P
plum-lihui 已提交
101
        tmqCom.initConsumerTable()
G
Ganlin Zhao 已提交
102

P
plum-lihui 已提交
103 104 105 106 107 108 109 110 111 112 113 114
        # again create one new stb1
        paraDict["stbName"] = 'stb1'
        paraDict['ctbPrefix'] = 'ctb1n_'
        tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
        tdLog.info("create ctb")
        tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
                             ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
        tdLog.info("async insert data")
        # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
        #                                        ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
        #                                        startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
        pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
G
Ganlin Zhao 已提交
115

P
plum-lihui 已提交
116
        tdLog.info("create topics from database")
G
Ganlin Zhao 已提交
117
        topicFromDb = 'topic_dbt'
P
plum-lihui 已提交
118
        tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
G
Ganlin Zhao 已提交
119

P
plum-lihui 已提交
120 121 122 123
        if self.snapshot == 0:
            consumerId     = 0
        elif self.snapshot == 1:
            consumerId     = 1
G
Ganlin Zhao 已提交
124

P
plum-lihui 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137
        expectrowcnt   = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
        topicList      = topicFromDb
        ifcheckdata    = 1
        ifManualCommit = 1
        keyList        = 'group.id:cgrp1,\
                        enable.auto.commit:true,\
                        auto.commit.interval.ms:1000,\
                        auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])

G
Ganlin Zhao 已提交
138
        tmqCom.getStartConsumeNotifyFromTmqsim()
P
plum-lihui 已提交
139
        tdLog.info("drop some ctables")
G
Ganlin Zhao 已提交
140
        paraDict["stbName"] = 'stb'
P
plum-lihui 已提交
141 142 143
        paraDict['ctbPrefix'] = 'ctb'
        paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 3 / 4)  # drop 1/4 ctbls
        paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
G
Ganlin Zhao 已提交
144
        # tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName']))
P
plum-lihui 已提交
145
        tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
G
Ganlin Zhao 已提交
146

P
plum-lihui 已提交
147
        pInsertThread.join()
G
Ganlin Zhao 已提交
148

P
plum-lihui 已提交
149 150 151 152 153 154 155 156
        tdLog.info("start to check consume result")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]

        tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
G
Ganlin Zhao 已提交
157

158 159 160
        if self.snapshot == 0:
            if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)):
                tdLog.exit("tmq consume rows error with snapshot = 0!")
P
plum-lihui 已提交
161

G
Ganlin Zhao 已提交
162
        tdLog.info("wait subscriptions exit ....")
P
plum-lihui 已提交
163
        tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
G
Ganlin Zhao 已提交
164

P
plum-lihui 已提交
165 166 167
        tdSql.query("drop topic %s"%topicFromDb)
        tdLog.info("success dorp topic: %s"%topicFromDb)
        tdLog.printNoPrefix("======== test case 1 end ...... ")
G
Ganlin Zhao 已提交
168

P
plum-lihui 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
    # drop one stb
    def tmqCase2(self):
        tdLog.printNoPrefix("======== test case 2: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbStartIdx': 0,
                    'ctbNum':     100,
                    'rowsPerTbl': 1000,
                    'batchNum':   1000,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'endTs': 0,
                    'pollDelay':  5,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}
        paraDict['snapshot'] = self.snapshot
        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
G
Ganlin Zhao 已提交
195 196
        paraDict['rowsPerTbl'] = self.rowsPerTbl

P
plum-lihui 已提交
197
        tmqCom.initConsumerTable()
G
Ganlin Zhao 已提交
198

P
plum-lihui 已提交
199 200 201 202 203 204 205 206 207 208 209 210
        # again create one new stb1
        paraDict["stbName"] = 'stb2'
        paraDict['ctbPrefix'] = 'ctb2n_'
        tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
        tdLog.info("create ctb")
        tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
                             ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
        tdLog.info("async insert data")
        # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
        #                                        ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
        #                                        startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
        pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
G
Ganlin Zhao 已提交
211

P
plum-lihui 已提交
212
        tdLog.info("create topics from database")
G
Ganlin Zhao 已提交
213
        topicFromDb = 'topic_dbt'
P
plum-lihui 已提交
214
        tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
G
Ganlin Zhao 已提交
215

P
plum-lihui 已提交
216
        if self.snapshot == 0:
P
plum-lihui 已提交
217
            consumerId     = 2
P
plum-lihui 已提交
218
        elif self.snapshot == 1:
P
plum-lihui 已提交
219
            consumerId     = 3
G
Ganlin Zhao 已提交
220

P
plum-lihui 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233
        expectrowcnt   = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
        topicList      = topicFromDb
        ifcheckdata    = 1
        ifManualCommit = 1
        keyList        = 'group.id:cgrp1,\
                        enable.auto.commit:true,\
                        auto.commit.interval.ms:1000,\
                        auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])

G
Ganlin Zhao 已提交
234
        tmqCom.getStartConsumeNotifyFromTmqsim()
P
plum-lihui 已提交
235 236
        tdLog.info("drop one stable")
        paraDict["stbName"] = 'stb1'
G
Ganlin Zhao 已提交
237
        tdSql.execute("drop table %s.%s" %(paraDict['dbName'], paraDict['stbName']))
P
plum-lihui 已提交
238 239

        pInsertThread.join()
G
Ganlin Zhao 已提交
240

P
plum-lihui 已提交
241 242 243 244 245 246 247 248
        tdLog.info("start to check consume result")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]

        tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
G
Ganlin Zhao 已提交
249

250 251 252
        if self.snapshot == 0:
            if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)):
                tdLog.exit("tmq consume rows error with snapshot = 0!")
P
plum-lihui 已提交
253

G
Ganlin Zhao 已提交
254
        tdLog.info("wait subscriptions exit ....")
P
plum-lihui 已提交
255
        tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
G
Ganlin Zhao 已提交
256

P
plum-lihui 已提交
257 258 259 260
        tdSql.query("drop topic %s"%topicFromDb)
        tdLog.info("success dorp topic: %s"%topicFromDb)
        tdLog.printNoPrefix("======== test case 2 end ...... ")

G
Ganlin Zhao 已提交
261
    def run(self):
P
plum-lihui 已提交
262 263 264 265 266
        tdLog.printNoPrefix("=============================================")
        tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
        self.snapshot = 0
        self.prepareTestEnv()
        self.tmqCase1()
G
Ganlin Zhao 已提交
267 268
        self.tmqCase2()

P
plum-lihui 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
        tdLog.printNoPrefix("====================================================================")
        tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
        self.snapshot = 1
        self.prepareTestEnv()
        self.tmqCase1()
        self.tmqCase2()

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())