tmqDropNtb-snapshot0.py 10.9 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

import taos
import sys
import time
import socket
import os
import threading
from enum import Enum

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
    def __init__(self):
        self.snapshot   = 0
        self.vgroups    = 4
        self.ctbNum     = 1000
        self.rowsPerTbl = 10
G
Ganlin Zhao 已提交
23

24
    def init(self, conn, logSql, replicaVar=1):
25
        self.replicaVar = int(replicaVar)
P
plum-lihui 已提交
26 27
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor(), False)
G
Ganlin Zhao 已提交
28

P
plum-lihui 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
    # drop some ntbs
    def tmqCase1(self):
        tdLog.printNoPrefix("======== test case 1: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ntb',
                    'ctbStartIdx': 0,
                    'ctbNum':     1000,
                    'rowsPerTbl': 100,
                    'batchNum':   100,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'endTs': 0,
                    'pollDelay':  5,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}
        paraDict['snapshot'] = self.snapshot
        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
G
Ganlin Zhao 已提交
55 56
        paraDict['rowsPerTbl'] = self.rowsPerTbl

P
plum-lihui 已提交
57 58 59 60 61 62 63
        tmqCom.initConsumerTable()
        tdLog.info("start create database....")
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
        tdLog.info("start create normal tables....")
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
64

P
plum-lihui 已提交
65
        tdLog.info("create topics from database")
G
Ganlin Zhao 已提交
66
        topicFromDb = 'topic_dbt'
P
plum-lihui 已提交
67
        tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
G
Ganlin Zhao 已提交
68

P
plum-lihui 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
        if self.snapshot == 0:
            consumerId     = 0
        elif self.snapshot == 1:
            consumerId     = 1

        expectrowcnt   = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
        topicList      = topicFromDb
        ifcheckdata    = 1
        ifManualCommit = 1
        keyList        = 'group.id:cgrp1,\
                        enable.auto.commit:true,\
                        auto.commit.interval.ms:1000,\
                        auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])

G
Ganlin Zhao 已提交
87
        tmqCom.getStartConsumeNotifyFromTmqsim()
P
plum-lihui 已提交
88 89 90 91 92
        tdLog.info("drop some ntables")
        # drop 1/4 ctbls from half offset
        paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
        paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
        tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
G
Ganlin Zhao 已提交
93

P
plum-lihui 已提交
94 95 96 97 98 99 100 101
        tdLog.info("start to check consume result")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]

        tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
G
Ganlin Zhao 已提交
102

103
        if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows <= expectrowcnt)):
P
plum-lihui 已提交
104
            tdLog.exit("tmq consume rows error with snapshot = 0!")
G
Ganlin Zhao 已提交
105 106

        tdLog.info("wait subscriptions exit ....")
P
plum-lihui 已提交
107
        tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
G
Ganlin Zhao 已提交
108

P
plum-lihui 已提交
109 110 111 112
        tdSql.query("drop topic %s"%topicFromDb)
        tdLog.info("success dorp topic: %s"%topicFromDb)
        tdLog.printNoPrefix("======== test case 1 end ...... ")

G
Ganlin Zhao 已提交
113 114


P
plum-lihui 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    # drop some ntbs and create some new ntbs
    def tmqCase2(self):
        tdLog.printNoPrefix("======== test case 2: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ntb',
                    'ctbStartIdx': 0,
                    'ctbNum':     1000,
                    'rowsPerTbl': 100,
                    'batchNum':   100,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'endTs': 0,
134
                    'pollDelay':  20,
P
plum-lihui 已提交
135 136 137 138 139 140
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   0}
        paraDict['snapshot'] = self.snapshot
        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
G
Ganlin Zhao 已提交
141 142
        paraDict['rowsPerTbl'] = self.rowsPerTbl

P
plum-lihui 已提交
143 144 145 146 147 148 149
        tmqCom.initConsumerTable()
        tdLog.info("start create database....")
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
        tdLog.info("start create normal tables....")
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
150

P
plum-lihui 已提交
151
        tdLog.info("create topics from database")
G
Ganlin Zhao 已提交
152
        topicFromDb = 'topic_dbt'
P
plum-lihui 已提交
153
        tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
G
Ganlin Zhao 已提交
154

P
plum-lihui 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
        if self.snapshot == 0:
            consumerId     = 2
        elif self.snapshot == 1:
            consumerId     = 3

        expectrowcnt   = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
        topicList      = topicFromDb
        ifcheckdata    = 1
        ifManualCommit = 1
        keyList        = 'group.id:cgrp1,\
                        enable.auto.commit:true,\
                        auto.commit.interval.ms:1000,\
                        auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])

G
Ganlin Zhao 已提交
173
        tmqCom.getStartConsumeNotifyFromTmqsim()
P
plum-lihui 已提交
174 175 176 177 178
        tdLog.info("drop some ntables")
        # drop 1/4 ctbls from half offset
        paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
        paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
        tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
G
Ganlin Zhao 已提交
179

P
plum-lihui 已提交
180 181 182 183 184 185
        tdLog.info("start create some new normal tables....")
        paraDict["ctbPrefix"] = 'newCtb'
        paraDict["ctbNum"] = self.ctbNum
        tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
        tdLog.info("start insert data into these new normal tables....")
        tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
G
Ganlin Zhao 已提交
186

P
plum-lihui 已提交
187 188 189 190 191 192 193 194
        tdLog.info("start to check consume result")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]

        tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
G
Ganlin Zhao 已提交
195

196
        if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows <= expectrowcnt)):
P
plum-lihui 已提交
197
            tdLog.exit("tmq consume rows error with snapshot = 0!")
G
Ganlin Zhao 已提交
198 199

        tdLog.info("wait subscriptions exit ....")
P
plum-lihui 已提交
200
        tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
G
Ganlin Zhao 已提交
201

P
plum-lihui 已提交
202 203 204
        tdSql.query("drop topic %s"%topicFromDb)
        tdLog.info("success dorp topic: %s"%topicFromDb)
        tdLog.printNoPrefix("======== test case 2 end ...... ")
G
Ganlin Zhao 已提交
205 206

    def run(self):
P
plum-lihui 已提交
207 208 209 210
        tdLog.printNoPrefix("=============================================")
        tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
        self.snapshot = 0
        self.tmqCase1()
G
Ganlin Zhao 已提交
211 212
        self.tmqCase2()

P
plum-lihui 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226
        # tdLog.printNoPrefix("====================================================================")
        # tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
        # self.snapshot = 1
        # self.tmqCase1()
        # self.tmqCase2()

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())