taosdemo.py 24.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#!/usr/bin/python3
#  * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
#  *
#  * This program is free software: you can use, redistribute, and/or modify
#  * it under the terms of the GNU Affero General Public License, version 3
#  * or later ("AGPL"), as published by the Free Software Foundation.
#  *
#  * This program is distributed in the hope that it will be useful, but WITHOUT
#  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  * FITNESS FOR A PARTICULAR PURPOSE.
#  *
#  * You should have received a copy of the GNU Affero General Public License
#  * along with this program. If not, see <http://www.gnu.org/licenses/>.

# -*- coding: utf-8 -*-

import sys
import getopt
import requests
import json
import random
import time
import datetime
24
from multiprocessing import Manager, Pool, Lock
25 26 27 28 29 30 31 32 33 34
from multipledispatch import dispatch
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED


@dispatch(str, str)
def v_print(msg: str, arg: str):
    if verbose:
        print(msg % arg)


35 36 37 38 39 40 41 42 43 44 45 46
@dispatch(str, str, str)
def v_print(msg: str, arg1: str, arg2: str):
    if verbose:
        print(msg % (arg1, arg2))


@dispatch(str, str, str, str)
def v_print(msg: str, arg1: str, arg2: str, arg3: str):
    if verbose:
        print(msg % (arg1, arg2, arg3))


47 48 49 50 51 52
@dispatch(str, str, str, str, str)
def v_print(msg: str, arg1: str, arg2: str, arg3: str, arg4: str):
    if verbose:
        print(msg % (arg1, arg2, arg3, arg4))


53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
@dispatch(str, int)
def v_print(msg: str, arg: int):
    if verbose:
        print(msg % int(arg))


@dispatch(str, int, str)
def v_print(msg: str, arg1: int, arg2: str):
    if verbose:
        print(msg % (int(arg1), str(arg2)))


@dispatch(str, str, int)
def v_print(msg: str, arg1: str, arg2: int):
    if verbose:
        print(msg % (arg1, int(arg2)))


@dispatch(str, int, int)
def v_print(msg: str, arg1: int, arg2: int):
    if verbose:
        print(msg % (int(arg1), int(arg2)))


@dispatch(str, int, int, str)
def v_print(msg: str, arg1: int, arg2: int, arg3: str):
    if verbose:
        print(msg % (int(arg1), int(arg2), str(arg3)))


@dispatch(str, int, int, int)
def v_print(msg: str, arg1: int, arg2: int, arg3: int):
    if verbose:
        print(msg % (int(arg1), int(arg2), int(arg3)))


@dispatch(str, int, int, int, int)
def v_print(msg: str, arg1: int, arg2: int, arg3: int, arg4: int):
    if verbose:
        print(msg % (int(arg1), int(arg2), int(arg3), int(arg4)))


def restful_execute(host: str, port: int, user: str, password: str, cmd: str):
96
    url = "http://%s:%d/rest/sql" % (host, restPort)
97

98
    v_print("restful_execute - cmd: %s", cmd)
99 100 101 102 103

    resp = requests.post(url, cmd, auth=(user, password))

    v_print("resp status: %d", resp.status_code)

104
    if debug:
105 106 107 108 109 110 111 112 113 114 115 116
        v_print(
            "resp text: %s",
            json.dumps(
                resp.json(),
                sort_keys=True,
                indent=2))
    else:
        print("resp: %s" % json.dumps(resp.json()))


def query_func(process: int, thread: int, cmd: str):
    v_print("%d process %d thread cmd: %s", process, thread, cmd)
117

118 119 120
    if oneMoreHost != "NotSupported" and random.randint(
            0, 1) == 1:
        v_print("%s", "Send to second host")
121 122 123 124 125
        if native:
            cursor2.execute(cmd)
        else:
            restful_execute(
                oneMoreHost, port, user, password, cmd)
126
    else:
127
        v_print("%s%s%s", "Send ", cmd, " to the host")
128
        if native:
129 130
            pass
#            cursor.execute(cmd)
131 132 133
        else:
            restful_execute(
                host, port, user, password, cmd)
134 135


136 137 138 139 140 141 142 143 144 145
def query_data_process(cmd: str):
    # establish connection if native
    if native:
        v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
        try:
            conn = taos.connect(
                host=host,
                user=user,
                password=password,
                config=configDir)
146
            v_print("conn: %s", str(conn.__class__))
147 148 149
        except Exception as e:
            print("Error: %s" % e.args[0])
            sys.exit(1)
150

151 152
        try:
            cursor = conn.cursor()
153
            v_print("cursor:%d %s", id(cursor), str(cursor.__class__))
154 155
        except Exception as e:
            print("Error: %s" % e.args[0])
156
            conn.close()
157
            sys.exit(1)
158

159 160 161 162 163
    if native:
        try:
            cursor.execute(cmd)
            cols = cursor.description
            data = cursor.fetchall()
164

165 166 167 168 169 170
            for col in data:
                print(col)
        except Exception as e:
            conn.close()
            print("Error: %s" % e.args[0])
            sys.exit(1)
171

172 173 174 175 176 177 178
    else:
        restful_execute(
                host,
                port,
                user,
                password,
                cmd)
179

180 181 182
    if native:
        cursor.close()
        conn.close()
183 184 185 186


def create_stb():
    for i in range(0, numOfStb):
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
        if native:
            cursor.execute(
                "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))" %
                (stbName, i))
        else:
            restful_execute(
                host,
                port,
                user,
                password,
                "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))" %
                (stbName, i)
            )


def use_database():

    if native:
        cursor.execute("USE %s" % current_db)
    else:
        restful_execute(host, port, user, password, "USE %s" % current_db)
208 209 210 211 212


def create_databases():
    for i in range(0, numOfDb):
        v_print("will create database db%d", int(i))
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

        if native:
            cursor.execute(
                "CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i))
        else:
            restful_execute(
                host,
                port,
                user,
                password,
                "CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i))


def drop_tables():
    # TODO
    v_print("TODO: drop tables total %d", numOfTb)
    pass


def drop_stable():
    # TODO
    v_print("TODO: drop stables total %d", numOfStb)
    pass
236 237 238 239 240 241 242 243


def drop_databases():
    v_print("drop databases total %d", numOfDb)

    # drop exist databases first
    for i in range(0, numOfDb):
        v_print("will drop database db%d", int(i))
244 245 246 247 248 249 250 251 252 253 254 255 256

        if native:
            cursor.execute(
                "DROP DATABASE IF EXISTS %s%d" %
                (dbName, i))
        else:
            restful_execute(
                host,
                port,
                user,
                password,
                "DROP DATABASE IF EXISTS %s%d" %
                (dbName, i))
257 258 259 260 261 262 263 264 265 266


def insert_func(process: int, thread: int):
    v_print("%d process %d thread, insert_func ", process, thread)

    # generate uuid
    uuid_int = random.randint(0, numOfTb + 1)
    uuid = "%s" % uuid_int
    v_print("uuid is: %s", uuid)

267 268 269 270 271 272 273 274 275
    # establish connection if native
    if native:
        v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
        try:
            conn = taos.connect(
                host=host,
                user=user,
                password=password,
                config=configDir)
276
            v_print("conn: %s", str(conn.__class__))
277 278 279 280 281 282
        except Exception as e:
            print("Error: %s" % e.args[0])
            sys.exit(1)

        try:
            cursor = conn.cursor()
283
            v_print("cursor:%d %s", id(cursor), str(cursor.__class__))
284 285
        except Exception as e:
            print("Error: %s" % e.args[0])
286
            conn.close()
287 288
            sys.exit(1)

289 290
    v_print("numOfRec %d:", numOfRec)

291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
    row = 0
    while row < numOfRec:
        v_print("row: %d", row)
        sqlCmd = ['INSERT INTO ']
        try:
            sqlCmd.append(
                "%s.%s%d " % (current_db, tbName, thread))

            if (numOfStb > 0 and autosubtable):
                sqlCmd.append("USING %s.%s%d TAGS('%s') " %
                              (current_db, stbName, numOfStb - 1, uuid))

            start_time = datetime.datetime(
                2021, 1, 25) + datetime.timedelta(seconds=row)

            sqlCmd.append("VALUES ")
            for batchIter in range(0, batch):
                sqlCmd.append("('%s', %f) " %
                              (
                               start_time +
                                datetime.timedelta(
                                    milliseconds=batchIter),
                                  random.random()))
                row = row + 1
                if row >= numOfRec:
                    v_print("BREAK, row: %d numOfRec:%d", row, numOfRec)
                    break
318

319 320
        except Exception as e:
            print("Error: %s" % e.args[0])
321

322
        cmd = ' '.join(sqlCmd)
323

324 325
        if measure:
            exec_start_time = datetime.datetime.now()
326

327 328 329 330 331
        if native:
            affectedRows = cursor.execute(cmd)
        else:
            restful_execute(
                host, port, user, password, cmd)
332

333 334 335
        if measure:
            exec_end_time = datetime.datetime.now()
            exec_delta = exec_end_time - exec_start_time
336 337 338
            v_print(
                "consume %d microseconds",
                 exec_delta.microseconds)
339 340 341 342 343 344

        v_print("cmd: %s, length:%d", cmd, len(cmd))

    if native:
        cursor.close()
        conn.close()
345 346 347 348 349 350 351 352 353 354


def create_tb_using_stb():
    # TODO:
    pass


def create_tb():
    v_print("create_tb() numOfTb: %d", numOfTb)
    for i in range(0, numOfDb):
355 356 357
        if native:
            cursor.execute("USE %s%d" % (dbName, i))
        else:
358
            restful_execute(
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
                host, port, user, password, "USE %s%d" %
                (dbName, i))

        for j in range(0, numOfTb):
            if native:
                cursor.execute(
                    "CREATE TABLE %s%d (ts timestamp, value float)" %
                    (tbName, j))
            else:
                restful_execute(
                    host,
                    port,
                    user,
                    password,
                    "CREATE TABLE %s%d (ts timestamp, value float)" %
                    (tbName, j))
375 376


377 378
def insert_data_process(lock, i: int, begin: int, end: int):
    lock.acquire()
379
    tasks = end - begin
380
    v_print("insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks)
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403

    if (threads < (end - begin)):
        for j in range(begin, end, threads):
            with ThreadPoolExecutor(max_workers=threads) as executor:
                k = end if ((j + threads) > end) else (j + threads)
                workers = [
                    executor.submit(
                        insert_func,
                        i,
                        n) for n in range(
                        j,
                        k)]
                wait(workers, return_when=ALL_COMPLETED)
    else:
        with ThreadPoolExecutor(max_workers=threads) as executor:
            workers = [
                executor.submit(
                    insert_func,
                    i,
                    j) for j in range(
                    begin,
                    end)]
            wait(workers, return_when=ALL_COMPLETED)
404 405 406

    lock.release()

407

408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
def query_db(i):
    if native:
        cursor.execute("USE %s%d" % (dbName, i))
    else:
        restful_execute(
            host, port, user, password, "USE %s%d" %
            (dbName, i))

    for j in range(0, numOfTb):
        if native:
            cursor.execute(
                "SELECT COUNT(*) FROM %s%d" % (tbName, j))
        else:
            restful_execute(
                host, port, user, password, "SELECT COUNT(*) FROM %s%d" %
                (tbName, j))

425

426 427
def printConfig():

428 429 430
    print("###################################################################")
    print("# Use native interface:              %s" % native)
    print("# Server IP:                         %s" % host)
431
    if native:
432
        print("# Server port:                       %s" % port)
433
    else:
434 435 436 437 438 439 440 441 442 443 444 445 446 447
        print("# Server port:                       %s" % restPort)

    print("# Configuration Dir:                 %s" % configDir)
    print("# User:                              %s" % user)
    print("# Password:                          %s" % password)
    print("# Number of Columns per record:      %s" % colsPerRecord)
    print("# Number of Threads:                 %s" % threads)
    print("# Number of Processes:               %s" % processes)
    print("# Number of Tables:                  %s" % numOfTb)
    print("# Number of records per Table:       %s" % numOfRec)
    print("# Records/Request:                   %s" % batch)
    print("# Database name:                     %s" % dbName)
    print("# Replica:                           %s" % replica)
    print("# Use STable:                        %s" % useStable)
448
    print("# Table prefix:                      %s" % tbName)
449
    if useStable:
450
        print("# STable prefix:                     %s" % stbName)
451 452 453 454 455 456 457 458 459 460

    print("# Data order:                        %s" % outOfOrder)
    print("# Data out of order rate:            %s" % rateOOOO)
    print("# Delete method:                     %s" % deleteMethod)
    print("# Query command:                     %s" % queryCmd)
    print("# Insert Only:                       %s" % insertOnly)
    print("# Verbose output                     %s" % verbose)
    print("# Test time:                         %s" %
          datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
    print("###################################################################")
461

462 463 464

if __name__ == "__main__":

465
    native = False
466
    verbose = False
467
    debug = False
468
    measure = True
469
    dropDbOnly = False
470
    colsPerRecord = 3
471
    numOfDb = 1
472 473
    dbName = "test"
    replica = 1
474 475
    batch = 1
    numOfTb = 1
476
    tbName = "tb"
477
    useStable = False
478
    numOfStb = 0
479
    stbName = "stb"
480 481 482
    numOfRec = 10
    ieration = 1
    host = "127.0.0.1"
483
    configDir = "/etc/taos"
484
    oneMoreHost = "NotSupported"
485 486
    port = 6030
    restPort = 6041
487 488 489 490
    user = "root"
    defaultPass = "taosdata"
    processes = 1
    threads = 1
491
    insertOnly = False
492
    autosubtable = False
493
    queryCmd = "NO"
494 495 496 497
    outOfOrder = 0
    rateOOOO = 0
    deleteMethod = 0
    skipPrompt = False
498 499 500

    try:
        opts, args = getopt.gnu_getopt(sys.argv[1:],
501
                                       'Nh:p:u:P:d:a:m:Ms:Q:T:C:r:l:t:n:c:xOR:D:vgyH',
502 503
                                       [
            'native', 'host', 'port', 'user', 'password', 'dbname', 'replica', 'tbname',
504
            'stable', 'stbname', 'query', 'threads', 'processes',
505
            'recPerReq', 'colsPerRecord', 'numOfTb', 'numOfRec', 'config',
506
            'insertOnly', 'outOfOrder', 'rateOOOO', 'deleteMethod',
507
            'verbose', 'debug', 'skipPrompt', 'help'
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
        ])
    except getopt.GetoptError as err:
        print('ERROR:', err)
        print('Try `taosdemo.py --help` for more options.')
        sys.exit(1)

    if bool(opts) is False:
        print('Try `taosdemo.py --help` for more options.')
        sys.exit(1)

    for key, value in opts:
        if key in ['-H', '--help']:
            print('')
            print(
                'taosdemo.py for TDengine')
            print('')
            print('Author: Shuduo Sang <sangshuduo@gmail.com>')
            print('')

527
            print('\t-H, --help                        Show usage.')
528
            print('')
529

530 531 532 533 534 535
            print('\t-N, --native                      flag, Use native interface if set. Default is using RESTful interface.')
            print('\t-h, --host <hostname>             host, The host to connect to TDengine. Default is localhost.')
            print('\t-p, --port <port>                 port, The TCP/IP port number to use for the connection. Default is 0.')
            print('\t-u, --user <username>             user, The user name to use when connecting to the server. Default is \'root\'.')
            print('\t-P, --password <password>         password, The password to use when connecting to the server. Default is \'taosdata\'.')
            print('\t-l, --colsPerRec <number>         num_of_columns_per_record, The number of columns per record. Default is 3.')
536
            print(
537 538
                  '\t-d, --dbname <dbname>             database, Destination database. Default is \'test\'.')
            print('\t-a, --replica <replications>      replica, Set the replica parameters of the database, Default 1, min: 1, max: 5.')
539
            print(
540
                  '\t-m, --tbname <table prefix>       table_prefix, Table prefix name. Default is \'t\'.')
541
            print(
542
                  '\t-M, --stable                      flag, Use super table. Default is no')
543
            print(
544
                  '\t-s, --stbname <stable prefix>     stable_prefix, STable prefix name. Default is \'st\'')
545
            print('\t-Q, --query [NO|EACHTB|command]   query, Execute query command. set \'EACHTB\' means select * from each table')
546
            print(
547
                  '\t-T, --threads <number>            num_of_threads, The number of threads. Default is 1.')
548
            print(
549 550
                  '\t-C, --processes <number>          num_of_processes, The number of threads. Default is 1.')
            print('\t-r, --batch <number>              num_of_records_per_req, The number of records per request. Default is 1000.')
551
            print(
552 553 554 555 556 557 558 559 560
                  '\t-t, --numOfTb <number>            num_of_tables, The number of tables. Default is 1.')
            print('\t-n, --numOfRec <number>           num_of_records_per_table, The number of records per table. Default is 1.')
            print('\t-c, --config <path>               config_directory, Configuration directory. Default is \'/etc/taos/\'.')
            print('\t-x, --inserOnly                   flag, Insert only flag.')
            print('\t-O, --outOfOrder                  out of order data insert, 0: In order, 1: Out of order. Default is in order.')
            print('\t-R, --rateOOOO <number>           rate, Out of order data\'s rate--if order=1 Default 10, min: 0, max: 50.')
            print('\t-D, --deleteMethod <number>       Delete data methods 0: don\'t delete, 1: delete by table, 2: delete by stable, 3: delete by database.')
            print('\t-v, --verbose                     Print verbose output')
            print('\t-g, --debug                       Print debug output')
561
            print(
562
                  '\t-y, --skipPrompt                  Skip read key for continous test, default is not skip')
563 564 565
            print('')
            sys.exit(0)

566 567 568 569 570 571 572
        if key in ['-N', '--native']:
            try:
                import taos
            except Exception as e:
                print("Error: %s" % e.args[0])
                sys.exit(1)
            native = True
573

574 575
        if key in ['-h', '--host']:
            host = value
576

577
        if key in ['-p', '--port']:
578 579
            port = int(value)

580
        if key in ['-u', '--user']:
581 582
            user = value

583
        if key in ['-P', '--password']:
584 585 586 587
            password = value
        else:
            password = defaultPass

588 589 590 591 592 593 594 595
        if key in ['-d', '--dbname']:
            dbName = value

        if key in ['-a', '--replica']:
            replica = int(value)
            if replica < 1:
                print("FATAL: number of replica need > 0")
                sys.exit(1)
596

597
        if key in ['-m', '--tbname']:
598
            tbName = value
599

600 601 602
        if key in ['-M', '--stable']:
            useStable = True
            numOfStb = 1
603

604
        if key in ['-s', '--stbname']:
605
            stbName = value
606 607 608

        if key in ['-Q', '--query']:
            queryCmd = str(value)
609

610
        if key in ['-T', '--threads']:
611 612 613 614 615
            threads = int(value)
            if threads < 1:
                print("FATAL: number of threads must be larger than 0")
                sys.exit(1)

616
        if key in ['-C', '--processes']:
617 618 619
            processes = int(value)
            if processes < 1:
                print("FATAL: number of processes must be larger than 0")
620 621
                sys.exit(1)

622
        if key in ['-r', '--batch']:
623 624
            batch = int(value)

625 626 627 628
        if key in ['-l', '--colsPerRec']:
            colsPerRec = int(value)

        if key in ['-t', '--numOfTb']:
629 630 631
            numOfTb = int(value)
            v_print("numOfTb is %d", numOfTb)

632
        if key in ['-n', '--numOfRec']:
633 634
            numOfRec = int(value)
            v_print("numOfRec is %d", numOfRec)
635 636 637 638
            if numOfRec < 1:
                print("FATAL: number of records must be larger than 0")
                sys.exit(1)

639

640 641 642 643 644
        if key in ['-c', '--config']:
            configDir = value
            v_print("config dir: %s", configDir)

        if key in ['-x', '--insertOnly']:
645 646 647 648 649 650 651 652 653 654
            insertOnly = True
            v_print("insert only: %d", insertOnly)

        if key in ['-O', '--outOfOrder']:
            outOfOrder = int(value)
            v_print("out of order is %d", outOfOrder)

        if key in ['-R', '--rateOOOO']:
            rateOOOO = int(value)
            v_print("the rate of out of order is %d", rateOOOO)
655

656
        if key in ['-D', '--deleteMethod']:
657 658 659 660 661 662
            deleteMethod = int(value)
            if (deleteMethod < 0) or (deleteMethod > 3):
                print(
                    "inputed delete method is %d, valid value is 0~3, set to default 0" %
                    deleteMethod)
                deleteMethod = 0
663 664 665 666 667 668 669 670 671 672 673 674 675 676
            v_print("the delete method is %d", deleteMethod)

        if key in ['-v', '--verbose']:
            verbose = True

        if key in ['-g', '--debug']:
            debug = True

        if key in ['-y', '--skipPrompt']:
            skipPrompt = True

    if verbose:
        printConfig()

677
    if not skipPrompt:
678
        input("Press any key to continue..")
679

680
    # establish connection first if native
681 682 683 684 685 686 687 688
    if native:
        v_print("host:%s, user:%s passwd:%s configDir:%s ", host, user, password, configDir)
        try:
            conn = taos.connect(
                host=host,
                user=user,
                password=password,
                config=configDir)
689
            v_print("conn: %s", str(conn.__class__))
690 691 692 693 694 695
        except Exception as e:
            print("Error: %s" % e.args[0])
            sys.exit(1)

        try:
            cursor = conn.cursor()
696
            v_print("cursor:%d %s", id(cursor), str(cursor.__class__))
697 698
        except Exception as e:
            print("Error: %s" % e.args[0])
699
            conn.close()
700 701
            sys.exit(1)

702
    # drop data only if delete method be set
703 704 705 706 707 708 709 710 711 712
    if deleteMethod > 0:
        if deleteMethod == 1:
            drop_tables()
            print("Drop tables done.")
        elif deleteMethod == 2:
            drop_stables()
            print("Drop super tables done.")
        elif deleteMethod == 3:
            drop_databases()
            print("Drop Database done.")
713 714 715
        sys.exit(0)

    # create databases
716
    drop_databases()
717 718 719
    create_databases()

    # use last database
720
    current_db = "%s%d" % (dbName, (numOfDb - 1))
721
    use_database()
722

723 724 725
    if measure:
        start_time_begin = time.time()

726 727 728 729
    if numOfStb > 0:
        create_stb()
        if (autosubtable == False):
            create_tb_using_stb()
730 731
    else:
        create_tb()
732

733 734 735 736 737
    if measure:
        end_time = time.time()
        print(
            "Total time consumed {} seconds for create table.".format(
            (end_time - start_time_begin)))
738

739 740 741
    if native:
        cursor.close()
        conn.close()
742

743 744 745
    # start insert data
    if measure:
        start_time = time.time()
746

747 748 749
    manager = Manager()
    lock = manager.Lock()
    pool = Pool(processes)
750

751 752
    begin = 0
    end = 0
753

754 755 756 757
    quotient = numOfTb // processes
    if quotient < 1:
        processes = numOfTb
        quotient = 1
758

759 760 761 762 763 764 765 766 767
    remainder = numOfTb % processes
    v_print(
        "num of tables: %d, quotient: %d, remainder: %d",
        numOfTb,
        quotient,
        remainder)

    for i in range(processes):
        begin = end
768

769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786
        if i < remainder:
            end = begin + quotient + 1
        else:
            end = begin + quotient
        pool.apply_async(insert_data_process, args=(lock, i, begin, end,))

    pool.close()
    pool.join()
    time.sleep(1)

    if measure:
        end_time = time.time()
        print(
            "Total time consumed {} seconds for insert data.".format(
            (end_time - start_time)))


    # query data
787
    if queryCmd != "NO":
788
        print("queryCmd: %s" % queryCmd)
789
        query_data_process(queryCmd)
790

791 792 793 794
    if measure:
        end_time = time.time()
        print(
            "Total time consumed {} seconds.".format(
795
                (end_time - start_time_begin)))
796 797

    print("done")