tmqDropNtb-snapshot0.py 11.0 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

import taos
import sys
import time
import socket
import os
import threading
from enum import Enum

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
    def __init__(self):
        self.snapshot   = 0
        self.vgroups    = 4
        self.ctbNum     = 1000
        self.rowsPerTbl = 10
G
Ganlin Zhao 已提交
23

24
    def init(self, conn, logSql, replicaVar=1):
25
        self.replicaVar = int(replicaVar)
P
plum-lihui 已提交
26 27
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor(), False)
G
Ganlin Zhao 已提交
28

P
plum-lihui 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
    # drop some ntbs
    def tmqCase1(self):
        tdLog.printNoPrefix("======== test case 1: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ntb',
                    'ctbStartIdx': 0,
                    'ctbNum':     1000,
                    'rowsPerTbl': 100,
                    'batchNum':   100,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'endTs': 0,
                    'pollDelay':  5,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}
        paraDict['snapshot'] = self.snapshot
        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
G
Ganlin Zhao 已提交
55 56
        paraDict['rowsPerTbl'] = self.rowsPerTbl

P
plum-lihui 已提交
57 58 59
        tmqCom.initConsumerTable()
        tdLog.info("start create database....")
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
60
        tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName']))
P
plum-lihui 已提交
61 62 63 64
        tdLog.info("start create normal tables....")
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
65

P
plum-lihui 已提交
66
        tdLog.info("create topics from database")
G
Ganlin Zhao 已提交
67
        topicFromDb = 'topic_dbt'
P
plum-lihui 已提交
68
        tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
G
Ganlin Zhao 已提交
69

P
plum-lihui 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
        if self.snapshot == 0:
            consumerId     = 0
        elif self.snapshot == 1:
            consumerId     = 1

        expectrowcnt   = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
        topicList      = topicFromDb
        ifcheckdata    = 1
        ifManualCommit = 1
        keyList        = 'group.id:cgrp1,\
                        enable.auto.commit:true,\
                        auto.commit.interval.ms:1000,\
                        auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])

G
Ganlin Zhao 已提交
88
        tmqCom.getStartConsumeNotifyFromTmqsim()
P
plum-lihui 已提交
89 90 91 92 93
        tdLog.info("drop some ntables")
        # drop 1/4 ctbls from half offset
        paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
        paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
        tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
G
Ganlin Zhao 已提交
94

P
plum-lihui 已提交
95 96 97 98 99 100 101 102
        tdLog.info("start to check consume result")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]

        tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
G
Ganlin Zhao 已提交
103

P
plum-lihui 已提交
104 105
        if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)):
            tdLog.exit("tmq consume rows error with snapshot = 0!")
G
Ganlin Zhao 已提交
106 107

        tdLog.info("wait subscriptions exit ....")
P
plum-lihui 已提交
108
        tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
G
Ganlin Zhao 已提交
109

P
plum-lihui 已提交
110 111 112 113
        tdSql.query("drop topic %s"%topicFromDb)
        tdLog.info("success dorp topic: %s"%topicFromDb)
        tdLog.printNoPrefix("======== test case 1 end ...... ")

G
Ganlin Zhao 已提交
114 115


P
plum-lihui 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    # drop some ntbs and create some new ntbs
    def tmqCase2(self):
        tdLog.printNoPrefix("======== test case 2: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ntb',
                    'ctbStartIdx': 0,
                    'ctbNum':     1000,
                    'rowsPerTbl': 100,
                    'batchNum':   100,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'endTs': 0,
                    'pollDelay':  10,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}
        paraDict['snapshot'] = self.snapshot
        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
G
Ganlin Zhao 已提交
142 143
        paraDict['rowsPerTbl'] = self.rowsPerTbl

P
plum-lihui 已提交
144 145 146
        tmqCom.initConsumerTable()
        tdLog.info("start create database....")
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
147
        tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName']))
P
plum-lihui 已提交
148 149 150 151
        tdLog.info("start create normal tables....")
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
152

P
plum-lihui 已提交
153
        tdLog.info("create topics from database")
G
Ganlin Zhao 已提交
154
        topicFromDb = 'topic_dbt'
P
plum-lihui 已提交
155
        tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
G
Ganlin Zhao 已提交
156

P
plum-lihui 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
        if self.snapshot == 0:
            consumerId     = 2
        elif self.snapshot == 1:
            consumerId     = 3

        expectrowcnt   = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
        topicList      = topicFromDb
        ifcheckdata    = 1
        ifManualCommit = 1
        keyList        = 'group.id:cgrp1,\
                        enable.auto.commit:true,\
                        auto.commit.interval.ms:1000,\
                        auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])

G
Ganlin Zhao 已提交
175
        tmqCom.getStartConsumeNotifyFromTmqsim()
P
plum-lihui 已提交
176 177 178 179 180
        tdLog.info("drop some ntables")
        # drop 1/4 ctbls from half offset
        paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
        paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
        tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
G
Ganlin Zhao 已提交
181

P
plum-lihui 已提交
182 183 184 185 186 187
        tdLog.info("start create some new normal tables....")
        paraDict["ctbPrefix"] = 'newCtb'
        paraDict["ctbNum"] = self.ctbNum
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into these new normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
188

P
plum-lihui 已提交
189 190 191 192 193 194 195 196
        tdLog.info("start to check consume result")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]

        tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
G
Ganlin Zhao 已提交
197

P
plum-lihui 已提交
198 199
        if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)):
            tdLog.exit("tmq consume rows error with snapshot = 0!")
G
Ganlin Zhao 已提交
200 201

        tdLog.info("wait subscriptions exit ....")
P
plum-lihui 已提交
202
        tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
G
Ganlin Zhao 已提交
203

P
plum-lihui 已提交
204 205 206
        tdSql.query("drop topic %s"%topicFromDb)
        tdLog.info("success dorp topic: %s"%topicFromDb)
        tdLog.printNoPrefix("======== test case 2 end ...... ")
G
Ganlin Zhao 已提交
207 208

    def run(self):
P
plum-lihui 已提交
209 210 211 212
        tdLog.printNoPrefix("=============================================")
        tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
        self.snapshot = 0
        self.tmqCase1()
G
Ganlin Zhao 已提交
213 214
        self.tmqCase2()

P
plum-lihui 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228
        # tdLog.printNoPrefix("====================================================================")
        # tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
        # self.snapshot = 1
        # self.tmqCase1()
        # self.tmqCase2()

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())