未验证 提交 2a9ad2a3 编写于 作者: Y Yang Zhao 提交者: GitHub

Merge pull request #8661 from taosdata/zhaoyang/refactor/taosdemo-split-files

taosdemo refactor split file
......@@ -2,12 +2,13 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.0...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(inc)
FIND_PACKAGE(Git)
IF (GIT_FOUND)
MESSAGE("Git found")
EXECUTE_PROCESS(
COMMAND ${GIT_EXECUTABLE} log --pretty=oneline -n 1 ${CMAKE_CURRENT_LIST_DIR}/taosdemo.c
COMMAND ${GIT_EXECUTABLE} log --pretty=oneline -n 1 ${CMAKE_CURRENT_LIST_DIR}
WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}
RESULT_VARIABLE RESULT
OUTPUT_VARIABLE TAOSDEMO_COMMIT_SHA1)
......@@ -18,7 +19,7 @@ IF (GIT_FOUND)
STRING(STRIP "${TAOSDEMO_COMMIT_SHA1}" TAOSDEMO_COMMIT_SHA1)
ENDIF ()
EXECUTE_PROCESS(
COMMAND ${GIT_EXECUTABLE} status -z -s ${CMAKE_CURRENT_LIST_DIR}/taosdemo.c
COMMAND ${GIT_EXECUTABLE} status -z -s ${CMAKE_CURRENT_LIST_DIR}
RESULT_VARIABLE RESULT
OUTPUT_VARIABLE TAOSDEMO_STATUS)
IF (TD_LINUX)
......@@ -64,7 +65,7 @@ ELSE ()
ENDIF ()
IF (TD_LINUX)
AUX_SOURCE_DIRECTORY(. SRC)
AUX_SOURCE_DIRECTORY(./src SRC)
ADD_EXECUTABLE(taosdemo ${SRC})
IF (TD_SOMODE_STATIC)
......@@ -73,9 +74,9 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES(taosdemo taos cJson ${LINK_JEMALLOC})
ENDIF ()
ELSEIF (TD_WINDOWS)
AUX_SOURCE_DIRECTORY(. SRC)
AUX_SOURCE_DIRECTORY(./src SRC)
ADD_EXECUTABLE(taosdemo ${SRC})
SET_SOURCE_FILES_PROPERTIES(./taosdemo.c PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./src/demoUtil.c PROPERTIES COMPILE_FLAGS -w)
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson lua)
ELSE ()
......@@ -83,7 +84,7 @@ ELSEIF (TD_WINDOWS)
ENDIF ()
ELSEIF (TD_DARWIN)
# missing a few dependencies, such as <argp.h>
AUX_SOURCE_DIRECTORY(. SRC)
AUX_SOURCE_DIRECTORY(./src SRC)
ADD_EXECUTABLE(taosdemo ${SRC})
IF (TD_SOMODE_STATIC)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __DEMO__
#define __DEMO__
#include <stdint.h>
#include <taos.h>
#include <taoserror.h>
#define _GNU_SOURCE
#define CURL_STATICLIB
#ifdef LINUX
#include <argp.h>
#include <inttypes.h>
#ifndef _ALPINE
#include <error.h>
#endif
#include <pthread.h>
#include <regex.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include <wordexp.h>
#else
#include <regex.h>
#include <stdio.h>
#endif
#include <assert.h>
#include <stdlib.h>
// #include "os.h"
#include "taos.h"
#include "taoserror.h"
#include "tutil.h"
#define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096
#define SQL_BUFF_LEN 1024
extern char configDir[];
#define STR_INSERT_INTO "INSERT INTO "
#define MAX_RECORDS_PER_REQ 32766
#define HEAD_BUFF_LEN \
TSDB_MAX_COLUMNS * 24 // 16*MAX_COLUMNS + (192+32)*2 + insert into ..
#define BUFFER_SIZE TSDB_MAX_ALLOWED_SQL_LEN
#define FETCH_BUFFER_SIZE 100 * TSDB_MAX_ALLOWED_SQL_LEN
#define COND_BUF_LEN (BUFFER_SIZE - 30)
#define COL_BUFFER_LEN ((TSDB_COL_NAME_LEN + 15) * TSDB_MAX_COLUMNS)
#define MAX_USERNAME_SIZE 64
#define MAX_HOSTNAME_SIZE \
253 // https://man7.org/linux/man-pages/man7/hostname.7.html
#define MAX_TB_NAME_SIZE 64
#define MAX_DATA_SIZE \
(16 * TSDB_MAX_COLUMNS) + 20 // max record len: 16*MAX_COLUMNS, timestamp
// string and ,('') need extra space
#define OPT_ABORT 1 /* –abort */
#define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255.
#define MAX_PATH_LEN 4096
#define DEFAULT_START_TIME 1500000000000
#define MAX_PREPARED_RAND 1000000
#define INT_BUFF_LEN 12
#define BIGINT_BUFF_LEN 21
#define SMALLINT_BUFF_LEN 7
#define TINYINT_BUFF_LEN 5
#define BOOL_BUFF_LEN 6
#define FLOAT_BUFF_LEN 22
#define DOUBLE_BUFF_LEN 42
#define TIMESTAMP_BUFF_LEN 21
#define PRINT_STAT_INTERVAL 30 * 1000
#define MAX_SAMPLES 10000
#define MAX_NUM_COLUMNS \
(TSDB_MAX_COLUMNS - 1) // exclude first column timestamp
#define MAX_DB_COUNT 8
#define MAX_SUPER_TABLE_COUNT 200
#define MAX_QUERY_SQL_COUNT 100
#define MAX_DATABASE_COUNT 256
#define MAX_JSON_BUFF 6400000
#define INPUT_BUF_LEN 256
#define EXTRA_SQL_LEN 256
#define TBNAME_PREFIX_LEN \
(TSDB_TABLE_NAME_LEN - 20) // 20 characters reserved for seq
#define SMALL_BUFF_LEN 8
#define DATATYPE_BUFF_LEN (SMALL_BUFF_LEN * 3)
#define NOTE_BUFF_LEN (SMALL_BUFF_LEN * 16)
#define DEFAULT_NTHREADS 8
#define DEFAULT_TIMESTAMP_STEP 1
#define DEFAULT_INTERLACE_ROWS 0
#define DEFAULT_DATATYPE_NUM 1
#define DEFAULT_CHILDTABLES 10000
#define DEFAULT_TEST_MODE 0
#define DEFAULT_METAFILE NULL
#define DEFAULT_SQLFILE NULL
#define DEFAULT_HOST "localhost"
#define DEFAULT_PORT 6030
#define DEFAULT_IFACE INTERFACE_BUT
#define DEFAULT_DATABASE "test"
#define DEFAULT_REPLICA 1
#define DEFAULT_TB_PREFIX "d"
#define DEFAULT_ESCAPE_CHAR false
#define DEFAULT_USE_METRIC true
#define DEFAULT_DROP_DB true
#define DEFAULT_AGGR_FUNC false
#define DEFAULT_DEBUG false
#define DEFAULT_VERBOSE false
#define DEFAULT_PERF_STAT false
#define DEFAULT_ANS_YES false
#define DEFAULT_OUTPUT "./output.txt"
#define DEFAULT_SYNC_MODE 0
#define DEFAULT_DATA_TYPE \
{ TSDB_DATA_TYPE_FLOAT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_FLOAT }
#define DEFAULT_DATATYPE \
{ "FLOAT", "INT", "FLOAT" }
#define DEFAULT_BINWIDTH 64
#define DEFAULT_COL_COUNT 4
#define DEFAULT_LEN_ONE_ROW 76
#define DEFAULT_INSERT_INTERVAL 0
#define DEFAULT_QUERY_TIME 1
#define DEFAULT_PREPARED_RAND 10000
#define DEFAULT_REQ_PER_REQ 30000
#define DEFAULT_INSERT_ROWS 10000
#define DEFAULT_ABORT 0
#define DEFAULT_RATIO 0
#define DEFAULT_DISORDER_RANGE 1000
#define DEFAULT_METHOD_DEL 1
#define DEFAULT_TOTAL_INSERT 0
#define DEFAULT_TOTAL_AFFECT 0
#define DEFAULT_DEMO_MODE true
#define DEFAULT_CREATE_BATCH 10
#define DEFAULT_SUB_INTERVAL 10000
#define DEFAULT_QUERY_INTERVAL 10000
#define SML_LINE_SQL_SYNTAX_OFFSET 7
#if _MSC_VER <= 1900
#define __func__ __FUNCTION__
#endif
#define debugPrint(fmt, ...) \
do { \
if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: " fmt, __VA_ARGS__); \
} while (0)
#define verbosePrint(fmt, ...) \
do { \
if (g_args.verbose_print) fprintf(stderr, "VERB: " fmt, __VA_ARGS__); \
} while (0)
#define performancePrint(fmt, ...) \
do { \
if (g_args.performance_print) \
fprintf(stderr, "PERF: " fmt, __VA_ARGS__); \
} while (0)
#define errorPrint(fmt, ...) \
do { \
fprintf(stderr, "\033[31m"); \
fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \
fprintf(stderr, "ERROR: " fmt, __VA_ARGS__); \
fprintf(stderr, "\033[0m"); \
} while (0)
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
INVAID_TEST
};
typedef enum CREATE_SUB_TABLE_MOD_EN {
PRE_CREATE_SUBTBL,
AUTO_CREATE_SUBTBL,
NO_CREATE_SUBTBL
} CREATE_SUB_TABLE_MOD_EN;
typedef enum TABLE_EXISTS_EN {
TBL_NO_EXISTS,
TBL_ALREADY_EXISTS,
TBL_EXISTS_BUTT
} TABLE_EXISTS_EN;
enum enumSYNC_MODE { SYNC_MODE, ASYNC_MODE, MODE_BUT };
enum enum_TAOS_INTERFACE {
TAOSC_IFACE,
REST_IFACE,
STMT_IFACE,
SML_IFACE,
INTERFACE_BUT
};
typedef enum enumQUERY_CLASS {
SPECIFIED_CLASS,
STABLE_CLASS,
CLASS_BUT
} QUERY_CLASS;
typedef enum enum_PROGRESSIVE_OR_INTERLACE {
PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE,
INVALID_INSERT_MODE
} PROG_OR_INTERLACE_MODE;
typedef enum enumQUERY_TYPE {
NO_INSERT_TYPE,
INSERT_TYPE,
QUERY_TYPE_BUT
} QUERY_TYPE;
enum _show_db_index {
TSDB_SHOW_DB_NAME_INDEX,
TSDB_SHOW_DB_CREATED_TIME_INDEX,
TSDB_SHOW_DB_NTABLES_INDEX,
TSDB_SHOW_DB_VGROUPS_INDEX,
TSDB_SHOW_DB_REPLICA_INDEX,
TSDB_SHOW_DB_QUORUM_INDEX,
TSDB_SHOW_DB_DAYS_INDEX,
TSDB_SHOW_DB_KEEP_INDEX,
TSDB_SHOW_DB_CACHE_INDEX,
TSDB_SHOW_DB_BLOCKS_INDEX,
TSDB_SHOW_DB_MINROWS_INDEX,
TSDB_SHOW_DB_MAXROWS_INDEX,
TSDB_SHOW_DB_WALLEVEL_INDEX,
TSDB_SHOW_DB_FSYNC_INDEX,
TSDB_SHOW_DB_COMP_INDEX,
TSDB_SHOW_DB_CACHELAST_INDEX,
TSDB_SHOW_DB_PRECISION_INDEX,
TSDB_SHOW_DB_UPDATE_INDEX,
TSDB_SHOW_DB_STATUS_INDEX,
TSDB_MAX_SHOW_DB
};
// -----------------------------------------SHOW TABLES CONFIGURE
// -------------------------------------
enum _show_stables_index {
TSDB_SHOW_STABLES_NAME_INDEX,
TSDB_SHOW_STABLES_CREATED_TIME_INDEX,
TSDB_SHOW_STABLES_COLUMNS_INDEX,
TSDB_SHOW_STABLES_METRIC_INDEX,
TSDB_SHOW_STABLES_UID_INDEX,
TSDB_SHOW_STABLES_TID_INDEX,
TSDB_SHOW_STABLES_VGID_INDEX,
TSDB_MAX_SHOW_STABLES
};
enum _describe_table_index {
TSDB_DESCRIBE_METRIC_FIELD_INDEX,
TSDB_DESCRIBE_METRIC_TYPE_INDEX,
TSDB_DESCRIBE_METRIC_LENGTH_INDEX,
TSDB_DESCRIBE_METRIC_NOTE_INDEX,
TSDB_MAX_DESCRIBE_METRIC
};
typedef struct SArguments_S {
char * metaFile;
uint32_t test_mode;
char * host;
uint16_t port;
uint16_t iface;
char * user;
char password[SHELL_MAX_PASSWORD_LEN];
char * database;
int replica;
char * tb_prefix;
bool escapeChar;
char * sqlFile;
bool use_metric;
bool drop_database;
bool aggr_func;
bool answer_yes;
bool debug_print;
bool verbose_print;
bool performance_print;
char * output_file;
bool async_mode;
char data_type[MAX_NUM_COLUMNS + 1];
char * dataType[MAX_NUM_COLUMNS + 1];
uint32_t binwidth;
uint32_t columnCount;
uint64_t lenOfOneRow;
uint32_t nthreads;
uint64_t insert_interval;
uint64_t timestamp_step;
int64_t query_times;
int64_t prepared_rand;
uint32_t interlaceRows;
uint32_t reqPerReq; // num_of_records_per_req
uint64_t max_sql_len;
int64_t ntables;
int64_t insertRows;
int abort;
uint32_t disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms, us or ns. according to database precision
uint32_t method_of_delete;
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
bool demo_mode; // use default column name and semi-random data
} SArguments;
typedef struct SColumn_S {
char field[TSDB_COL_NAME_LEN];
char data_type;
char dataType[DATATYPE_BUFF_LEN];
uint32_t dataLen;
char note[NOTE_BUFF_LEN];
} StrColumn;
typedef struct SSuperTable_S {
char stbName[TSDB_TABLE_NAME_LEN];
char dataSource[SMALL_BUFF_LEN]; // rand_gen or sample
char childTblPrefix[TBNAME_PREFIX_LEN];
uint16_t childTblExists;
int64_t childTblCount;
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in
// one sql
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
uint16_t lineProtocol;
int64_t childTblLimit;
uint64_t childTblOffset;
// int multiThreadWriteOneTbl; // 0: no, 1: yes
uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms, us or ns. according to database precision
uint64_t maxSqlLen; //
uint64_t insertInterval; // insert interval, will override global insert
// interval
int64_t insertRows;
int64_t timeStampStep;
int tsPrecision;
char startTimestamp[MAX_TB_NAME_SIZE];
char sampleFormat[SMALL_BUFF_LEN]; // csv, json
char sampleFile[MAX_FILE_NAME_LEN];
char tagsFile[MAX_FILE_NAME_LEN];
uint32_t columnCount;
StrColumn columns[TSDB_MAX_COLUMNS];
uint32_t tagCount;
StrColumn tags[TSDB_MAX_TAGS];
char * childTblName;
bool escapeChar;
char * colsOfCreateChildTable;
uint64_t lenOfOneRow;
uint64_t lenOfTagOfOneRow;
char *sampleDataBuf;
bool useSampleTs;
uint32_t tagSource; // 0: rand, 1: tag sample
char * tagDataBuf;
uint32_t tagSampleCount;
uint32_t tagUsePos;
// bind param batch
char *sampleBindBatchArray;
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
} SSuperTable;
typedef struct {
char name[TSDB_DB_NAME_LEN];
char create_time[32];
int64_t ntables;
int32_t vgroups;
int16_t replica;
int16_t quorum;
int16_t days;
char keeplist[64];
int32_t cache; // MB
int32_t blocks;
int32_t minrows;
int32_t maxrows;
int8_t wallevel;
int32_t fsync;
int8_t comp;
int8_t cachelast;
char precision[SMALL_BUFF_LEN]; // time resolution
int8_t update;
char status[16];
} SDbInfo;
typedef struct SDbCfg_S {
// int maxtablesPerVnode;
uint32_t minRows; // 0 means default
uint32_t maxRows; // 0 means default
int comp;
int walLevel;
int cacheLast;
int fsync;
int replica;
int update;
int keep;
int days;
int cache;
int blocks;
int quorum;
char precision[SMALL_BUFF_LEN];
} SDbCfg;
typedef struct SDataBase_S {
char dbName[TSDB_DB_NAME_LEN];
bool drop; // 0: use exists, 1: if exists, drop then new create
SDbCfg dbCfg;
uint64_t superTblCount;
SSuperTable *superTbls;
} SDataBase;
typedef struct SDbs_S {
char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE];
struct sockaddr_in serv_addr;
uint16_t port;
char user[MAX_USERNAME_SIZE];
char password[SHELL_MAX_PASSWORD_LEN];
char resultFile[MAX_FILE_NAME_LEN];
bool use_metric;
bool aggr_func;
bool asyncMode;
uint32_t threadCount;
uint32_t threadCountForCreateTbl;
uint32_t dbCount;
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
SDataBase *db;
} SDbs;
typedef struct SpecifiedQueryInfo_S {
uint64_t queryInterval; // 0: unlimited > 0 loop/s
uint32_t concurrent;
int sqlCount;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
uint64_t queryTimes;
bool subscribeRestart;
int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][BUFFER_SIZE + 1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
int endAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT];
TAOS_RES *res[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried;
} SpecifiedQueryInfo;
typedef struct SuperQueryInfo_S {
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t queryInterval; // 0: unlimited > 0 loop/s
uint32_t threadCnt;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
bool subscribeRestart;
int subscribeKeepProgress;
uint64_t queryTimes;
int64_t childTblCount;
char childTblPrefix[TBNAME_PREFIX_LEN]; // 20 characters reserved for seq
int sqlCount;
char sql[MAX_QUERY_SQL_COUNT][BUFFER_SIZE + 1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume;
int endAfterConsume;
TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT];
char * childTblName;
uint64_t totalQueried;
} SuperQueryInfo;
typedef struct SQueryMetaInfo_S {
char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE];
uint16_t port;
struct sockaddr_in serv_addr;
char user[MAX_USERNAME_SIZE];
char password[SHELL_MAX_PASSWORD_LEN];
char dbName[TSDB_DB_NAME_LEN];
char queryMode[SMALL_BUFF_LEN]; // taosc, rest
SpecifiedQueryInfo specifiedQueryInfo;
SuperQueryInfo superQueryInfo;
uint64_t totalQueried;
} SQueryMetaInfo;
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT * stmt;
int64_t * bind_ts;
int64_t * bind_ts_array;
char * bindParams;
char * is_null;
int threadID;
char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision;
char filePath[MAX_PATH_LEN];
FILE * fp;
char tb_prefix[TSDB_TABLE_NAME_LEN];
uint64_t start_table_from;
uint64_t end_table_to;
int64_t ntables;
int64_t tables_created;
uint64_t data_of_rate;
int64_t start_time;
char * cols;
bool use_metric;
SSuperTable *stbInfo;
char * buffer; // sql cmd buffer
// for async insert
tsem_t lock_sem;
int64_t counter;
uint64_t st;
uint64_t et;
uint64_t lastTs;
// sample data
int64_t samplePos;
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
// insert delay statistics
uint64_t cntDelay;
uint64_t totalDelay;
uint64_t avgDelay;
uint64_t maxDelay;
uint64_t minDelay;
// seq of query or subscribe
uint64_t querySeq; // sequence number of sql command
TAOS_SUB *tsub;
char **lines;
SOCKET sockfd;
} threadInfo;
/* ************ Global variables ************ */
extern char * g_aggreFuncDemo[];
extern char * g_aggreFunc[];
extern SArguments g_args;
extern SDbs g_Dbs;
extern char * g_dupstr;
extern int64_t g_totalChildTables;
extern int64_t g_actualChildTables;
extern SQueryMetaInfo g_queryInfo;
extern FILE * g_fpOfInsertResult;
#define min(a, b) (((a) < (b)) ? (a) : (b))
/* ************ Function declares ************ */
/* demoCommandOpt.c */
int parse_args(int argc, char *argv[]);
void setParaFromArg();
void querySqlFile(TAOS *taos, char *sqlFile);
void testCmdLine();
/* demoJsonOpt.c */
int getInfoFromJsonFile(char *file);
int testMetaFile();
/* demoUtil.c */
int isCommentLine(char *line);
void replaceChildTblName(char *inSql, char *outSql, int tblIndex);
void setupForAnsiEscape(void);
void resetAfterAnsiEscape(void);
int taosRandom();
void tmfree(void *buf);
void tmfclose(FILE *fp);
void fetchResult(TAOS_RES *res, threadInfo *pThreadInfo);
void prompt();
void ERROR_EXIT(const char *msg);
int postProceSql(char *host, uint16_t port, char *sqlstr,
threadInfo *pThreadInfo);
int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
int regexMatch(const char *s, const char *reg, int cflags);
int convertHostToServAddr(char *host, uint16_t port,
struct sockaddr_in *serv_addr);
char *formatTimestamp(char *buf, int64_t val, int precision);
void errorWrongValue(char *program, char *wrong_arg, char *wrong_value);
void errorUnrecognized(char *program, char *wrong_arg);
void errorPrintReqArg(char *program, char *wrong_arg);
void errorPrintReqArg2(char *program, char *wrong_arg);
void errorPrintReqArg3(char *program, char *wrong_arg);
bool isStringNumber(char *input);
int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
char ** childTblNameOfSuperTbl,
int64_t *childTblCountOfSuperTbl);
int getChildNameOfSuperTableWithLimitAndOffset(TAOS *taos, char *dbName,
char * stbName,
char ** childTblNameOfSuperTbl,
int64_t *childTblCountOfSuperTbl,
int64_t limit, uint64_t offset,
bool escapChar);
/* demoInsert.c */
int insertTestProcess();
void postFreeResource();
/* demoOutput.c */
void printVersion();
void printfInsertMeta();
void printfInsertMetaToFile(FILE *fp);
void printStatPerThread(threadInfo *pThreadInfo);
void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo);
void printfQueryMeta();
void printHelp();
void printfQuerySystemInfo(TAOS *taos);
/* demoQuery.c */
int queryTestProcess();
/* demoSubscribe.c */
int subscribeTestProcess();
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __DEMODATA__
#define __DEMODATA__
#include "cJSON.h"
#include "demo.h"
/***** Global variables ******/
extern char * g_sampleDataBuf;
extern char * g_sampleBindBatchArray;
extern int32_t * g_randint;
extern uint32_t *g_randuint;
extern int64_t * g_randbigint;
extern uint64_t *g_randubigint;
extern float * g_randfloat;
extern double * g_randdouble;
extern char * g_randbool_buff;
extern char * g_randint_buff;
extern char * g_randuint_buff;
extern char * g_rand_voltage_buff;
extern char * g_randbigint_buff;
extern char * g_randubigint_buff;
extern char * g_randsmallint_buff;
extern char * g_randusmallint_buff;
extern char * g_randtinyint_buff;
extern char * g_randutinyint_buff;
extern char * g_randfloat_buff;
extern char * g_rand_current_buff;
extern char * g_rand_phase_buff;
extern char * g_randdouble_buff;
/***** Declare functions *****/
int init_rand_data();
char * rand_bool_str();
int32_t rand_bool();
char * rand_tinyint_str();
int32_t rand_tinyint();
char * rand_utinyint_str();
int32_t rand_utinyint();
char * rand_smallint_str();
int32_t rand_smallint();
char * rand_usmallint_str();
int32_t rand_usmallint();
char * rand_int_str();
int32_t rand_int();
char * rand_uint_str();
int32_t rand_uint();
char * rand_bigint_str();
int64_t rand_bigint();
char * rand_ubigint_str();
int64_t rand_ubigint();
char * rand_float_str();
float rand_float();
char * demo_current_float_str();
float UNUSED_FUNC demo_current_float();
char * demo_voltage_int_str();
int32_t UNUSED_FUNC demo_voltage_int();
char * demo_phase_float_str();
float UNUSED_FUNC demo_phase_float();
void rand_string(char *str, int size);
char * rand_double_str();
double rand_double();
int generateTagValuesForStb(SSuperTable *stbInfo, int64_t tableSeq,
char *tagsValBuf);
int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio,
int disorderRange);
int32_t prepareStbStmtBindTag(char *bindArray, SSuperTable *stbInfo,
char *tagsVal, int32_t timePrec);
int32_t prepareStmtWithoutStb(threadInfo *pThreadInfo, char *tableName,
uint32_t batch, int64_t insertRows,
int64_t recordFrom, int64_t startTime);
int32_t generateStbInterlaceData(threadInfo *pThreadInfo, char *tableName,
uint32_t batchPerTbl, uint64_t i,
uint32_t batchPerTblTimes, uint64_t tableSeq,
char *buffer, int64_t insertRows,
int64_t startTime, uint64_t *pRemainderBufLen);
int64_t generateInterlaceDataWithoutStb(char *tableName, uint32_t batch,
uint64_t tableSeq, char *dbName,
char *buffer, int64_t insertRows,
int64_t startTime,
uint64_t *pRemainderBufLen);
int32_t generateStbProgressiveData(SSuperTable *stbInfo, char *tableName,
int64_t tableSeq, char *dbName, char *buffer,
int64_t insertRows, uint64_t recordFrom,
int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen);
int32_t generateProgressiveDataWithoutStb(
char *tableName, threadInfo *pThreadInfo, char *buffer, int64_t insertRows,
uint64_t recordFrom, int64_t startTime, int64_t *pRemainderBufLen);
int64_t generateStbRowData(SSuperTable *stbInfo, char *recBuf,
int64_t remainderBufLen, int64_t timestamp);
int prepareSampleForStb(SSuperTable *stbInfo);
int prepareSampleForNtb();
int parseSamplefileToStmtBatch(SSuperTable *stbInfo);
int parseStbSampleToStmtBatchForThread(threadInfo * pThreadInfo,
SSuperTable *stbInfo, uint32_t timePrec,
uint32_t batch);
int parseNtbSampleToStmtBatchForThread(threadInfo *pThreadInfo,
uint32_t timePrec, uint32_t batch);
int prepareSampleData();
int32_t generateSmlConstPart(char *sml, SSuperTable *stbInfo,
threadInfo *pThreadInfo, int tbSeq);
int32_t generateSmlMutablePart(char *line, char *sml, SSuperTable *stbInfo,
threadInfo *pThreadInfo, int64_t timestamp);
int32_t generateSmlJsonTags(cJSON *tagsList, SSuperTable *stbInfo,
threadInfo *pThreadInfo, int tbSeq);
int32_t generateSmlJsonCols(cJSON *array, cJSON *tag, SSuperTable *stbInfo,
threadInfo *pThreadInfo, int64_t timestamp);
#endif
\ No newline at end of file
......@@ -21,7 +21,7 @@
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"keep": 36500,
"minRows": 100,
"maxRows": 4096,
"comp":2,
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "demo.h"
int64_t g_totalChildTables = DEFAULT_CHILDTABLES;
int64_t g_actualChildTables = 0;
FILE * g_fpOfInsertResult = NULL;
char * g_dupstr = NULL;
SDbs g_Dbs;
SQueryMetaInfo g_queryInfo;
SArguments g_args = {
DEFAULT_METAFILE, // metaFile
DEFAULT_TEST_MODE, // test_mode
DEFAULT_HOST, // host
DEFAULT_PORT, // port
DEFAULT_IFACE, // iface
TSDB_DEFAULT_USER, // user
TSDB_DEFAULT_PASS, // password
DEFAULT_DATABASE, // database
DEFAULT_REPLICA, // replica
DEFAULT_TB_PREFIX, // tb_prefix
DEFAULT_ESCAPE_CHAR, // escapeChar
DEFAULT_SQLFILE, // sqlFile
DEFAULT_USE_METRIC, // use_metric
DEFAULT_DROP_DB, // drop_database
DEFAULT_AGGR_FUNC, // aggr_func
DEFAULT_DEBUG, // debug_print
DEFAULT_VERBOSE, // verbose_print
DEFAULT_PERF_STAT, // performance statistic print
DEFAULT_ANS_YES, // answer_yes;
DEFAULT_OUTPUT, // output_file
DEFAULT_SYNC_MODE, // mode : sync or async
DEFAULT_DATA_TYPE, // data_type
DEFAULT_DATATYPE, // dataType
DEFAULT_BINWIDTH, // binwidth
DEFAULT_COL_COUNT, // columnCount, timestamp + float + int + float
DEFAULT_LEN_ONE_ROW, // lenOfOneRow
DEFAULT_NTHREADS, // nthreads
DEFAULT_INSERT_INTERVAL, // insert_interval
DEFAULT_TIMESTAMP_STEP, // timestamp_step
DEFAULT_QUERY_TIME, // query_times
DEFAULT_PREPARED_RAND, // prepared_rand
DEFAULT_INTERLACE_ROWS, // interlaceRows;
DEFAULT_REQ_PER_REQ, // reqPerReq
TSDB_MAX_ALLOWED_SQL_LEN, // max_sql_len
DEFAULT_CHILDTABLES, // ntables
DEFAULT_INSERT_ROWS, // insertRows
DEFAULT_ABORT, // abort
DEFAULT_RATIO, // disorderRatio
DEFAULT_DISORDER_RANGE, // disorderRange
DEFAULT_METHOD_DEL, // method_of_delete
DEFAULT_TOTAL_INSERT, // totalInsertRows;
DEFAULT_TOTAL_AFFECT, // totalAffectedRows;
DEFAULT_DEMO_MODE, // demo_mode;
};
int main(int argc, char *argv[]) {
if (parse_args(argc, argv)) {
exit(EXIT_FAILURE);
}
debugPrint("meta file: %s\n", g_args.metaFile);
if (g_args.metaFile) {
g_totalChildTables = 0;
if (getInfoFromJsonFile(g_args.metaFile)) {
exit(EXIT_FAILURE);
}
if (testMetaFile()) {
exit(EXIT_FAILURE);
}
} else {
memset(&g_Dbs, 0, sizeof(SDbs));
g_Dbs.db = calloc(1, sizeof(SDataBase));
if (NULL == g_Dbs.db) {
errorPrint("%s", "failed to allocate memory\n");
}
g_Dbs.db[0].superTbls = calloc(1, sizeof(SSuperTable));
if (NULL == g_Dbs.db[0].superTbls) {
errorPrint("%s", "failed to allocate memory\n");
}
setParaFromArg();
if (NULL != g_args.sqlFile) {
TAOS *qtaos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password,
g_Dbs.db[0].dbName, g_Dbs.port);
querySqlFile(qtaos, g_args.sqlFile);
taos_close(qtaos);
} else {
testCmdLine();
}
}
postFreeResource();
return 0;
}
\ No newline at end of file
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "demo.h"
void selectAndGetResult(threadInfo *pThreadInfo, char *command) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
TAOS_RES *res = taos_query(pThreadInfo->taos, command);
if (res == NULL || taos_errno(res) != 0) {
errorPrint("failed to execute sql:%s, reason:%s\n", command,
taos_errstr(res));
taos_free_result(res);
return;
}
fetchResult(res, pThreadInfo);
taos_free_result(res);
} else if (0 ==
strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
int retCode = postProceSql(g_queryInfo.host, g_queryInfo.port, command,
pThreadInfo);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n",
pThreadInfo->threadID);
}
} else {
errorPrint("unknown query mode: %s\n", g_queryInfo.queryMode);
}
}
void *specifiedTableQuery(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
setThreadName("specTableQuery");
if (pThreadInfo->taos == NULL) {
TAOS *taos = NULL;
taos = taos_connect(g_queryInfo.host, g_queryInfo.user,
g_queryInfo.password, NULL, g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
}
}
char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
errorPrint("use database %s failed!\n\n", g_queryInfo.dbName);
return NULL;
}
uint64_t st = 0;
uint64_t et = 0;
uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
uint64_t totalQueried = 0;
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] !=
'\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
while (queryTimes--) {
if (g_queryInfo.specifiedQueryInfo.queryInterval &&
(et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) {
taosMsleep((int32_t)(g_queryInfo.specifiedQueryInfo.queryInterval -
(et - st))); // ms
}
st = taosGetTimestampMs();
selectAndGetResult(
pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
et = taosGetTimestampMs();
printf("=thread[%" PRId64 "] use %s complete one sql, Spent %10.3f s\n",
taosGetSelfPthreadId(), g_queryInfo.queryMode,
(et - st) / 1000.0);
totalQueried++;
g_queryInfo.specifiedQueryInfo.totalQueried++;
uint64_t currentPrintTime = taosGetTimestampMs();
uint64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30 * 1000) {
debugPrint("%s() LN%d, endTs=%" PRIu64 " ms, startTs=%" PRIu64
" ms\n",
__func__, __LINE__, endTs, startTs);
printf("thread[%d] has currently completed queries: %" PRIu64
", QPS: %10.6f\n",
pThreadInfo->threadID, totalQueried,
(double)(totalQueried / ((endTs - startTs) / 1000.0)));
lastPrintTime = currentPrintTime;
}
}
return NULL;
}
void *superTableQuery(void *sarg) {
char *sqlstr = calloc(1, BUFFER_SIZE);
if (NULL == sqlstr) {
errorPrint("%s", "failed to allocate memory\n");
return NULL;
}
threadInfo *pThreadInfo = (threadInfo *)sarg;
setThreadName("superTableQuery");
if (pThreadInfo->taos == NULL) {
TAOS *taos = NULL;
taos = taos_connect(g_queryInfo.host, g_queryInfo.user,
g_queryInfo.password, NULL, g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
free(sqlstr);
return NULL;
} else {
pThreadInfo->taos = taos;
}
}
uint64_t st = 0;
uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval;
uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
uint64_t totalQueried = 0;
uint64_t startTs = taosGetTimestampMs();
uint64_t lastPrintTime = taosGetTimestampMs();
while (queryTimes--) {
if (g_queryInfo.superQueryInfo.queryInterval &&
(et - st) < (int64_t)g_queryInfo.superQueryInfo.queryInterval) {
taosMsleep((int32_t)(g_queryInfo.superQueryInfo.queryInterval -
(et - st))); // ms
// printf("========sleep duration:%"PRId64 "========inserted
// rows:%d, table range:%d - %d\n", (1000 - (et - st)), i,
// pThreadInfo->start_table_from, pThreadInfo->end_table_to);
}
st = taosGetTimestampMs();
for (int i = (int)pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr, 0, BUFFER_SIZE);
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr,
i);
if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID);
}
selectAndGetResult(pThreadInfo, sqlstr);
totalQueried++;
g_queryInfo.superQueryInfo.totalQueried++;
int64_t currentPrintTime = taosGetTimestampMs();
int64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30 * 1000) {
printf(
"thread[%d] has currently completed queries: %" PRIu64
", QPS: %10.3f\n",
pThreadInfo->threadID, totalQueried,
(double)(totalQueried / ((endTs - startTs) / 1000.0)));
lastPrintTime = currentPrintTime;
}
}
}
et = taosGetTimestampMs();
printf("####thread[%" PRId64
"] complete all sqls to allocate all sub-tables[%" PRIu64
" - %" PRIu64 "] once queries duration:%.4fs\n\n",
taosGetSelfPthreadId(), pThreadInfo->start_table_from,
pThreadInfo->end_table_to, (double)(et - st) / 1000.0);
}
free(sqlstr);
return NULL;
}
int queryTestProcess() {
printfQueryMeta();
TAOS *taos = NULL;
taos = taos_connect(g_queryInfo.host, g_queryInfo.user,
g_queryInfo.password, NULL, g_queryInfo.port);
if (taos == NULL) {
errorPrint("Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(EXIT_FAILURE);
}
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos, g_queryInfo.dbName,
g_queryInfo.superQueryInfo.stbName,
&g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.superQueryInfo.childTblCount);
}
prompt();
if (g_args.debug_print || g_args.verbose_print) {
printfQuerySystemInfo(taos);
}
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
if (convertHostToServAddr(g_queryInfo.host, g_queryInfo.port,
&g_queryInfo.serv_addr) != 0)
ERROR_EXIT("convert host to server address");
}
pthread_t * pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
uint64_t startTs = taosGetTimestampMs();
if ((nSqlCount > 0) && (nConcurrent > 0)) {
pids = calloc(1, nConcurrent * nSqlCount * sizeof(pthread_t));
infos = calloc(1, nConcurrent * nSqlCount * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
taos_close(taos);
ERROR_EXIT("memory allocation failed for create threads\n");
}
for (uint64_t i = 0; i < nSqlCount; i++) {
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = (int)seq;
pThreadInfo->querySeq = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "USE %s", g_queryInfo.dbName);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos);
free(pids);
errorPrint("use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
}
}
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint("Could not create socket : %d",
WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__,
sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(
sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr),
sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__,
__LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
pThreadInfo->taos =
NULL; // workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedTableQuery,
pThreadInfo);
}
}
} else {
g_queryInfo.specifiedQueryInfo.concurrent = 0;
}
taos_close(taos);
pthread_t * pidsOfSub = NULL;
threadInfo *infosOfSub = NULL;
//==== create sub threads for query from all sub table of the super table
if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
(g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub =
calloc(1, g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t));
infosOfSub = calloc(
1, g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
free(infos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n");
}
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
int64_t a = ntables / threads;
if (a < 1) {
threads = (int)ntables;
a = 1;
}
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
uint64_t tableFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infosOfSub + i;
pThreadInfo->threadID = i;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i < b ? a + 1 : a;
pThreadInfo->end_table_to =
i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos =
NULL; // workaround to use separate taos connection;
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint("Could not create socket : %d",
WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__,
sockfd);
ERROR_EXIT("opening socket");
}
int retConn =
connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr),
sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__,
__LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
g_queryInfo.superQueryInfo.threadCnt = threads;
} else {
g_queryInfo.superQueryInfo.threadCnt = 0;
}
if ((nSqlCount > 0) && (nConcurrent > 0)) {
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infos + i * nSqlCount + j;
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
}
}
}
tmfree((char *)pids);
tmfree((char *)infos);
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infosOfSub + i;
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
}
tmfree((char *)pidsOfSub);
tmfree((char *)infosOfSub);
// taos_close(taos);// workaround to use separate taos connection;
uint64_t endTs = taosGetTimestampMs();
uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
g_queryInfo.superQueryInfo.totalQueried;
fprintf(stderr,
"==== completed total queries: %" PRIu64
", the QPS of all threads: %10.3f====\n",
totalQueried,
(double)(totalQueried / ((endTs - startTs) / 1000.0)));
return 0;
}
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册