subscribeDb4.py 5.4 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import sys
import time
import socket
import os
import threading

import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
P
plum-lihui 已提交
17 18 19 20 21 22 23 24 25 26 27 28
    paraDict = {'dbName':     'db12',
                'dropFlag':   1,
                'event':      '',
                'vgroups':    4,
                'stbName':    'stb0',
                'colPrefix':  'c',
                'tagPrefix':  't',
                'colSchema':   [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':16, 'count':1}, {'type': 'timestamp','count':1}],
                'tagSchema':   [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
                'ctbPrefix':  'ctb',
                'ctbStartIdx': 0,
                'ctbNum':     10,
P
plum-lihui 已提交
29
                'rowsPerTbl': 10000,
P
plum-lihui 已提交
30 31 32 33 34
                'batchNum':   10,
                'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                'pollDelay':  20,
                'showMsg':    1,
                'showRow':    1}
P
plum-lihui 已提交
35 36 37 38 39 40 41 42 43

    cdbName = 'cdb'
    # some parameter to consumer processor
    consumerId = 0
    expectrowcnt = 0
    topicList = ''
    ifcheckdata = 0
    ifManualCommit = 1
    groupId = 'group.id:cgrp1'
P
plum-lihui 已提交
44
    autoCommit = 'enable.auto.commit:true'
P
plum-lihui 已提交
45 46 47 48 49
    autoCommitInterval = 'auto.commit.interval.ms:1000'
    autoOffset = 'auto.offset.reset:earliest'

    pollDelay = 20
    showMsg   = 1
G
Ganlin Zhao 已提交
50
    showRow   = 1
P
plum-lihui 已提交
51 52 53

    hostname = socket.gethostname()

54
    def init(self, conn, logSql, replicaVar=1):
P
plum-lihui 已提交
55 56 57 58 59 60 61
        tdLog.debug(f"start to excute {__file__}")
        logSql = False
        tdSql.init(conn.cursor(), logSql)

    def tmqCase12(self):
        tdLog.printNoPrefix("======== test case 12: ")
        tdLog.info("step 1: create database, stb, ctb and insert data")
G
Ganlin Zhao 已提交
62

P
plum-lihui 已提交
63 64
        tmqCom.initConsumerTable(self.cdbName)

P
plum-lihui 已提交
65
        tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"])
P
plum-lihui 已提交
66 67

        self.paraDict["stbName"] = 'stb1'
P
plum-lihui 已提交
68 69 70
        tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"])
        tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"])
        tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
P
plum-lihui 已提交
71 72

        self.paraDict["stbName"] = 'stb2'
P
plum-lihui 已提交
73 74 75 76
        self.paraDict["ctbPrefix"] = 'newctb'
        tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"])
        tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"])
        tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
P
plum-lihui 已提交
77 78

        tdLog.info("create topics from db")
G
Ganlin Zhao 已提交
79
        topicName1 = 'topic_%s'%(self.paraDict['dbName'])
P
plum-lihui 已提交
80
        tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName']))
G
Ganlin Zhao 已提交
81

P
plum-lihui 已提交
82 83 84 85
        topicList = topicName1
        keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset)
        self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2
        tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit)
G
Ganlin Zhao 已提交
86 87

        tdLog.info("start consume processor")
P
plum-lihui 已提交
88 89
        tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)

P
plum-lihui 已提交
90 91 92
        tdLog.info("After waiting for a commit notify, drop one stable")
        #time.sleep(3)
        tmqCom.getStartCommitNotifyFromTmqsim()
G
Ganlin Zhao 已提交
93
        tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
P
plum-lihui 已提交
94 95 96 97 98 99 100 101

        tdLog.info("wait result from consumer, then check it")
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)

        totalConsumeRows = 0
        for i in range(expectRows):
            totalConsumeRows += resultList[i]
G
Ganlin Zhao 已提交
102

P
plum-lihui 已提交
103 104 105 106
        if not (totalConsumeRows >= self.expectrowcnt/2 and totalConsumeRows <= self.expectrowcnt):
            tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt))
            tdLog.exit("tmq consume rows error!")

P
plum-lihui 已提交
107
        time.sleep(10)
P
plum-lihui 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
        tdSql.query("drop topic %s"%topicName1)

        tdLog.printNoPrefix("======== test case 12 end ...... ")

    def run(self):
        tdSql.prepare()
        self.tmqCase12()

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())