提交 d6dc4f8c 编写于 作者: T tickduan

Merge branch 'master' into cq .INT64_MIN

......@@ -769,6 +769,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
index = 0;
sToken = tStrGetToken(sql, &index, false);
if (sToken.type == TK_ILLEGAL) {
return tscSQLSyntaxErrMsg(pCmd->payload, "unrecognized token", sToken.z);
}
if (sToken.type == TK_RP) {
break;
}
......
......@@ -2,6 +2,10 @@
from .connection import TDengineConnection
from .cursor import TDengineCursor
# For some reason, the following is needed for VS Code (through PyLance) to
# recognize that "error" is a valid module of the "taos" package.
from .error import ProgrammingError
# Globals
threadsafety = 0
paramstyle = 'pyformat'
......
......@@ -19,6 +19,7 @@
*/
#include <stdint.h>
#include <taos.h>
#define _GNU_SOURCE
#define CURL_STATICLIB
......@@ -52,6 +53,8 @@
#include "taoserror.h"
#include "tutil.h"
#define STMT_IFACE_ENABLED 0
#define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096
......@@ -61,6 +64,8 @@ extern char configDir[];
#define QUERY_JSON_NAME "query.json"
#define SUBSCRIBE_JSON_NAME "subscribe.json"
#define STR_INSERT_INTO "INSERT INTO "
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
......@@ -70,6 +75,8 @@ enum TEST_MODE {
#define MAX_RECORDS_PER_REQ 32766
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
#define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2)
#define COND_BUF_LEN BUFFER_SIZE - 30
......@@ -120,17 +127,24 @@ enum enumSYNC_MODE {
MODE_BUT
};
enum enum_TAOS_INTERFACE {
TAOSC_IFACE,
REST_IFACE,
STMT_IFACE,
INTERFACE_BUT
};
typedef enum enumQUERY_CLASS {
SPECIFIED_CLASS,
STABLE_CLASS,
CLASS_BUT
} QUERY_CLASS;
typedef enum enum_INSERT_MODE {
typedef enum enum_PROGRESSIVE_OR_INTERLACE {
PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE,
INVALID_INSERT_MODE
} INSERT_MODE;
} PROG_OR_INTERLACE_MODE;
typedef enum enumQUERY_TYPE {
NO_INSERT_TYPE,
......@@ -196,6 +210,7 @@ typedef struct SArguments_S {
uint32_t test_mode;
char * host;
uint16_t port;
uint16_t iface;
char * user;
char * password;
char * database;
......@@ -217,13 +232,13 @@ typedef struct SArguments_S {
uint32_t num_of_threads;
uint64_t insert_interval;
int64_t query_times;
uint64_t interlace_rows;
uint64_t num_of_RPR; // num_of_records_per_req
uint32_t interlace_rows;
uint32_t num_of_RPR; // num_of_records_per_req
uint64_t max_sql_len;
int64_t num_of_tables;
int64_t num_of_DPT;
int abort;
int disorderRatio; // 0: no disorder, >0: x%
uint32_t disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision
uint32_t method_of_delete;
char ** arg_list;
......@@ -240,18 +255,19 @@ typedef struct SColumn_S {
typedef struct SSuperTable_S {
char sTblName[MAX_TB_NAME_SIZE+1];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char childTblPrefix[MAX_TB_NAME_SIZE];
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t childTblExists;
int64_t childTblCount;
bool childTblExists; // 0: no, 1: yes
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
int64_t childTblLimit;
uint64_t childTblOffset;
// int multiThreadWriteOneTbl; // 0: no, 1: yes
uint64_t interlaceRows; //
uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision
uint64_t maxSqlLen; //
......@@ -420,6 +436,7 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
int threadID;
char db_name[MAX_DB_NAME_SIZE+1];
uint32_t time_precision;
......@@ -434,6 +451,7 @@ typedef struct SThreadInfo_S {
char* cols;
bool use_metric;
SSuperTable* superTblInfo;
char *buffer; // sql cmd buffer
// for async insert
tsem_t lock_sem;
......@@ -557,6 +575,7 @@ SArguments g_args = {
0, // test_mode
"127.0.0.1", // host
6030, // port
TAOSC_IFACE, // iface
"root", // user
#ifdef _TD_POWER_
"powerdb", // password
......@@ -673,6 +692,12 @@ static void printHelp() {
"The host to connect to TDengine. Default is localhost.");
printf("%s%s%s%s\n", indent, "-p", indent,
"The TCP/IP port number to use for the connection. Default is 0.");
printf("%s%s%s%s\n", indent, "-I", indent,
#if STMT_IFACE_ENABLED == 1
"The interface (taosc, rest, and stmt) taosdemo uses. Default is 'taosc'.");
#else
"The interface (taosc, rest) taosdemo uses. Default is 'taosc'.");
#endif
printf("%s%s%s%s\n", indent, "-d", indent,
"Destination database. Default is 'test'.");
printf("%s%s%s%s\n", indent, "-a", indent,
......@@ -761,6 +786,25 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
exit(EXIT_FAILURE);
}
arguments->port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-I") == 0) {
if (argc == i+1) {
printHelp();
errorPrint("%s", "\n\t-I need a valid string following!\n");
exit(EXIT_FAILURE);
}
++i;
if (0 == strcasecmp(argv[i], "taosc")) {
arguments->iface = TAOSC_IFACE;
} else if (0 == strcasecmp(argv[i], "rest")) {
arguments->iface = REST_IFACE;
#if STMT_IFACE_ENABLED == 1
} else if (0 == strcasecmp(argv[i], "stmt")) {
arguments->iface = STMT_IFACE;
#endif
} else {
errorPrint("%s", "\n\t-I need a valid string following!\n");
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-u") == 0) {
if (argc == i+1) {
printHelp();
......@@ -793,7 +837,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
if ((argc == i+1)
|| (!isStringNumber(argv[i+1]))) {
printHelp();
errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n");
errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, not-0: ASYNC. Default is SYNC.\n");
exit(EXIT_FAILURE);
}
arguments->async_mode = atoi(argv[++i]);
......@@ -897,6 +941,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(argv[i], "BIGINT")
&& strcasecmp(argv[i], "DOUBLE")
&& strcasecmp(argv[i], "BINARY")
&& strcasecmp(argv[i], "TIMESTAMP")
&& strcasecmp(argv[i], "NCHAR")) {
printHelp();
errorPrint("%s", "-b: Invalid data_type!\n");
......@@ -918,6 +963,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(token, "BIGINT")
&& strcasecmp(token, "DOUBLE")
&& strcasecmp(token, "BINARY")
&& strcasecmp(token, "TIMESTAMP")
&& strcasecmp(token, "NCHAR")) {
printHelp();
free(g_dupstr);
......@@ -1019,6 +1065,19 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
}
int columnCount;
for (columnCount = 0; columnCount < MAX_NUM_DATATYPE; columnCount ++) {
if (g_args.datatype[columnCount] == NULL) {
break;
}
}
if (0 == columnCount) {
perror("data type error!");
exit(-1);
}
g_args.num_of_CPR = columnCount;
if (((arguments->debug_print) && (arguments->metaFile == NULL))
|| arguments->verbose_print) {
printf("###################################################################\n");
......@@ -1028,7 +1087,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->port );
printf("# User: %s\n", arguments->user);
printf("# Password: %s\n", arguments->password);
printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false");
printf("# Use metric: %s\n",
arguments->use_metric ? "true" : "false");
if (*(arguments->datatype)) {
printf("# Specified data type: ");
for (int i = 0; i < MAX_NUM_DATATYPE; i++)
......@@ -1040,7 +1100,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
printf("# Insertion interval: %"PRIu64"\n",
arguments->insert_interval);
printf("# Number of records per req: %"PRIu64"\n",
printf("# Number of records per req: %u\n",
arguments->num_of_RPR);
printf("# Max SQL length: %"PRIu64"\n",
arguments->max_sql_len);
......@@ -1068,8 +1128,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
static bool getInfoFromJsonFile(char* file);
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
//static int getDataIntoMemForStb(SSuperTable* stbInfo);
static void init_rand_data();
static void tmfclose(FILE *fp) {
if (NULL != fp) {
......@@ -1088,7 +1146,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
TAOS_RES *res = NULL;
int32_t code = -1;
for (i = 0; i < 5; i++) {
for (i = 0; i < 5 /* retry */; i++) {
if (NULL != res) {
taos_free_result(res);
res = NULL;
......@@ -1104,7 +1162,8 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
verbosePrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
if (code != 0) {
if (!quiet) {
errorPrint("Failed to execute %s, reason: %s\n", command, taos_errstr(res));
errorPrint("Failed to execute %s, reason: %s\n",
command, taos_errstr(res));
}
taos_free_result(res);
//taos_close(taos);
......@@ -1320,6 +1379,8 @@ static void init_rand_data() {
static int printfInsertMeta() {
SHOW_PARSE_RESULT_START();
printf("interface: \033[33m%s\033[0m\n",
(g_args.iface==TAOSC_IFACE)?"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt");
printf("host: \033[33m%s:%u\033[0m\n",
g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
......@@ -1331,7 +1392,7 @@ static int printfInsertMeta() {
g_Dbs.threadCountByCreateTbl);
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
g_args.insert_interval);
printf("number of records per req: \033[33m%"PRIu64"\033[0m\n",
printf("number of records per req: \033[33m%u\033[0m\n",
g_args.num_of_RPR);
printf("max sql length: \033[33m%"PRIu64"\033[0m\n",
g_args.max_sql_len);
......@@ -1417,7 +1478,8 @@ static int printfInsertMeta() {
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
} else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
} else if (AUTO_CREATE_SUBTBL ==
g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "yes");
} else {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "error");
......@@ -1437,8 +1499,9 @@ static int printfInsertMeta() {
g_Dbs.db[i].superTbls[j].childTblPrefix);
printf(" dataSource: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].insertMode);
printf(" iface: \033[33m%s\033[0m\n",
(g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n",
g_Dbs.db[i].superTbls[j].childTblLimit);
......@@ -1456,7 +1519,7 @@ static int printfInsertMeta() {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
}
*/
printf(" interlaceRows: \033[33m%"PRIu64"\033[0m\n",
printf(" interlaceRows: \033[33m%u\033[0m\n",
g_Dbs.db[i].superTbls[j].interlaceRows);
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
......@@ -1534,7 +1597,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
fprintf(fp, "number of records per req: %"PRIu64"\n", g_args.num_of_RPR);
fprintf(fp, "number of records per req: %u\n", g_args.num_of_RPR);
fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len);
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
......@@ -1626,11 +1689,12 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs.db[i].superTbls[j].childTblPrefix);
fprintf(fp, " dataSource: %s\n",
g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n",
g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " iface: %s\n",
(g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
fprintf(fp, " insertRows: %"PRId64"\n",
g_Dbs.db[i].superTbls[j].insertRows);
fprintf(fp, " interlace rows: %"PRIu64"\n",
fprintf(fp, " interlace rows: %u\n",
g_Dbs.db[i].superTbls[j].interlaceRows);
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
fprintf(fp, " stable insert interval: %"PRIu64"\n",
......@@ -1643,7 +1707,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
}
*/
fprintf(fp, " interlaceRows: %"PRIu64"\n",
fprintf(fp, " interlaceRows: %u\n",
g_Dbs.db[i].superTbls[j].interlaceRows);
fprintf(fp, " disorderRange: %d\n",
g_Dbs.db[i].superTbls[j].disorderRange);
......@@ -2803,7 +2867,7 @@ static int createDatabasesAndStables() {
int validStbCount = 0;
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
......@@ -2813,7 +2877,7 @@ static int createDatabasesAndStables() {
&g_Dbs.db[i].superTbls[j]);
if (0 != ret) {
errorPrint("create super table %d failed!\n\n", j);
errorPrint("create super table %"PRIu64" failed!\n\n", j);
continue;
}
}
......@@ -2841,13 +2905,13 @@ static void* createTable(void *sarg)
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int64_t lastPrintTime = taosGetTimestampMs();
uint64_t lastPrintTime = taosGetTimestampMs();
int buff_len;
buff_len = BUFFER_SIZE / 8;
char *buffer = calloc(buff_len, 1);
if (buffer == NULL) {
pThreadInfo->buffer = calloc(buff_len, 1);
if (pThreadInfo->buffer == NULL) {
errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__);
exit(-1);
}
......@@ -2862,7 +2926,7 @@ static void* createTable(void *sarg)
for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
if (0 == g_Dbs.use_metric) {
snprintf(buffer, buff_len,
snprintf(pThreadInfo->buffer, buff_len,
"create table if not exists %s.%s%"PRIu64" %s;",
pThreadInfo->db_name,
g_args.tb_prefix, i,
......@@ -2871,13 +2935,13 @@ static void* createTable(void *sarg)
if (superTblInfo == NULL) {
errorPrint("%s() LN%d, use metric, but super table info is NULL\n",
__func__, __LINE__);
free(buffer);
free(pThreadInfo->buffer);
exit(-1);
} else {
if (0 == len) {
batchNum = 0;
memset(buffer, 0, buff_len);
len += snprintf(buffer + len,
memset(pThreadInfo->buffer, 0, buff_len);
len += snprintf(pThreadInfo->buffer + len,
buff_len - len, "create table ");
}
char* tagsValBuf = NULL;
......@@ -2889,10 +2953,10 @@ static void* createTable(void *sarg)
i % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
free(buffer);
free(pThreadInfo->buffer);
return NULL;
}
len += snprintf(buffer + len,
len += snprintf(pThreadInfo->buffer + len,
buff_len - len,
"if not exists %s.%s%"PRIu64" using %s.%s tags %s ",
pThreadInfo->db_name, superTblInfo->childTblPrefix,
......@@ -2909,13 +2973,14 @@ static void* createTable(void *sarg)
}
len = 0;
if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)){
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
free(buffer);
if (0 != queryDbExec(pThreadInfo->taos, pThreadInfo->buffer,
NO_INSERT_TYPE, false)){
errorPrint( "queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer);
free(pThreadInfo->buffer);
return NULL;
}
int64_t currentPrintTime = taosGetTimestampMs();
uint64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
pThreadInfo->threadID, pThreadInfo->start_table_from, i);
......@@ -2924,21 +2989,22 @@ static void* createTable(void *sarg)
}
if (0 != len) {
if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)) {
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
if (0 != queryDbExec(pThreadInfo->taos, pThreadInfo->buffer,
NO_INSERT_TYPE, false)) {
errorPrint( "queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer);
}
}
free(buffer);
free(pThreadInfo->buffer);
return NULL;
}
static int startMultiThreadCreateChildTable(
char* cols, int threads, uint64_t startFrom, int64_t ntables,
char* cols, int threads, uint64_t tableFrom, int64_t ntables,
char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
threadInfo *infos = malloc(threads * sizeof(threadInfo));
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed\n");
......@@ -2978,10 +3044,10 @@ static int startMultiThreadCreateChildTable(
return -1;
}
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->use_metric = true;
pThreadInfo->cols = cols;
pThreadInfo->minDelay = UINT64_MAX;
......@@ -3016,7 +3082,6 @@ static void createChildTables() {
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
continue;
}
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
uint64_t startFrom = 0;
......@@ -3024,6 +3089,7 @@ static void createChildTables() {
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n",
__func__, __LINE__, g_totalChildTables, startFrom);
startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
g_Dbs.threadCountByCreateTbl,
......@@ -3132,10 +3198,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
return 0;
}
#if 0
int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
// TODO
return 0;
}
#endif
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
......@@ -3520,9 +3588,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) {
printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n",
g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
g_args.num_of_RPR);
prompt();
g_args.interlace_rows = g_args.num_of_RPR;
......@@ -3753,25 +3821,29 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// dbinfo
cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name");
if (!stbName || stbName->type != cJSON_String || stbName->valuestring == NULL) {
if (!stbName || stbName->type != cJSON_String
|| stbName->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, stb name not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring,
MAX_TB_NAME_SIZE);
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) {
printf("ERROR: failed to read json, childtable_prefix not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring,
MAX_DB_NAME_SIZE);
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); // yes, no, null
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table");
if (autoCreateTbl
&& autoCreateTbl->type == cJSON_String
&& autoCreateTbl->valuestring != NULL) {
if (0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) {
if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3))
&& (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
......@@ -3816,6 +3888,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
}
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count");
if (!count || count->type != cJSON_Number || 0 >= count->valueint) {
errorPrint("%s() LN%d, failed to read json, childtable_count input mistake\n",
......@@ -3837,15 +3913,26 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest
if (insertMode && insertMode->type == cJSON_String
&& insertMode->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode,
insertMode->valuestring, MAX_DB_NAME_SIZE);
} else if (!insertMode) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE);
cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
if (stbIface && stbIface->type == cJSON_String
&& stbIface->valuestring != NULL) {
if (0 == strcasecmp(stbIface->valuestring, "taosc")) {
g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE;
} else if (0 == strcasecmp(stbIface->valuestring, "rest")) {
g_Dbs.db[i].superTbls[j].iface= REST_IFACE;
#if STMT_IFACE_ENABLED == 1
} else if (0 == strcasecmp(stbIface->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].iface= STMT_IFACE;
#endif
} else {
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
__func__, __LINE__, stbIface->valuestring);
goto PARSE_OVER;
}
} else if (!stbIface) {
g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE;
} else {
printf("ERROR: failed to read json, insert_mode not found\n");
errorPrint("%s", "failed to read json, insert_mode not found\n");
goto PARSE_OVER;
}
......@@ -3864,7 +3951,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset");
if ((childTbl_offset) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if (childTbl_offset->type != cJSON_Number || 0 > childTbl_offset->valueint) {
if ((childTbl_offset->type != cJSON_Number)
|| (0 > childTbl_offset->valueint)) {
printf("ERROR: failed to read json, childtable_offset\n");
goto PARSE_OVER;
}
......@@ -3920,7 +4008,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) {
if ((tagsFile && tagsFile->type == cJSON_String)
&& (tagsFile->valuestring != NULL)) {
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile,
tagsFile->valuestring, MAX_FILE_NAME_LEN);
if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) {
......@@ -3936,9 +4025,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON* maxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
int32_t len = maxSqlLen->valueint;
cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len");
if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) {
int32_t len = stbMaxSqlLen->valueint;
if (len > TSDB_MAX_ALLOWED_SQL_LEN) {
len = TSDB_MAX_ALLOWED_SQL_LEN;
} else if (len < 5) {
......@@ -3948,7 +4037,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
} else {
errorPrint("%s() LN%d, failed to read json, maxSqlLen input mistake\n",
errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3970,24 +4059,25 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
*/
cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (interlaceRows && interlaceRows->type == cJSON_Number) {
if (interlaceRows->valueint < 0) {
cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) {
if (stbInterlaceRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint;
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
// rows per table need be less than insert batch
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
g_args.num_of_RPR);
prompt();
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR;
}
} else if (!interlaceRows) {
} else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint(
......@@ -4311,8 +4401,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if (resubAfterConsume
&& resubAfterConsume->type == cJSON_Number) {
if ((resubAfterConsume)
&& (resubAfterConsume->type == cJSON_Number)
&& (resubAfterConsume->valueint >= 0)) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
} else if (!resubAfterConsume) {
......@@ -4473,14 +4564,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
}
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume");
if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume =
cJSON_GetObjectItem(superQuery, "resubAfterConsume");
if ((superResubAfterConsume)
&& (superResubAfterConsume->type == cJSON_Number)
&& (superResubAfterConsume->valueint >= 0)) {
g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) {
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.endAfterConsume = -1;
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
}
// supert table sqls
......@@ -4613,7 +4705,7 @@ static void prepareSampleData() {
static void postFreeResource() {
tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
......@@ -4661,16 +4753,22 @@ static int getRowDataFromSample(
return dataLen;
}
static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) {
static int64_t generateStbRowData(
SSuperTable* stbInfo,
char* recBuf, int64_t timestamp)
{
int64_t dataLen = 0;
char *pstr = recBuf;
int64_t maxLen = MAX_DATA_SIZE;
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ",", timestamp);
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"(%" PRId64 ",", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "BINARY", strlen("BINARY")))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "NCHAR", strlen("NCHAR")))) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType,
"BINARY", strlen("BINARY")))
|| (0 == strncasecmp(stbInfo->columns[i].dataType,
"NCHAR", strlen("NCHAR")))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
......@@ -4686,23 +4784,23 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf);
tmfree(buf);
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"INT", 3)) {
"INT", strlen("INT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%d,", rand_int());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"BIGINT", 6)) {
"BIGINT", strlen("BIGINT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%"PRId64",", rand_bigint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"FLOAT", 5)) {
"FLOAT", strlen("FLOAT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%f,", rand_float());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"DOUBLE", 6)) {
"DOUBLE", strlen("DOUBLE"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%f,", rand_double());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"SMALLINT", 8)) {
"SMALLINT", strlen("SMALLINT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%d,", rand_smallint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
......@@ -4732,46 +4830,38 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb
}
static int64_t generateData(char *recBuf, char **data_type,
int num_of_cols, int64_t timestamp, int lenOfBinary) {
int64_t timestamp, int lenOfBinary) {
memset(recBuf, 0, MAX_DATA_SIZE);
char *pstr = recBuf;
pstr += sprintf(pstr, "(%" PRId64, timestamp);
int c = 0;
for (; c < MAX_NUM_DATATYPE; c++) {
if (data_type[c] == NULL) {
break;
}
}
if (0 == c) {
perror("data type error!");
exit(-1);
}
int columnCount = g_args.num_of_CPR;
for (int i = 0; i < c; i++) {
if (strcasecmp(data_type[i % c], "TINYINT") == 0) {
for (int i = 0; i < columnCount; i++) {
if (strcasecmp(data_type[i % columnCount], "TINYINT") == 0) {
pstr += sprintf(pstr, ",%d", rand_tinyint() );
} else if (strcasecmp(data_type[i % c], "SMALLINT") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "SMALLINT") == 0) {
pstr += sprintf(pstr, ",%d", rand_smallint());
} else if (strcasecmp(data_type[i % c], "INT") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "INT") == 0) {
pstr += sprintf(pstr, ",%d", rand_int());
} else if (strcasecmp(data_type[i % c], "BIGINT") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "BIGINT") == 0) {
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
} else if (strcasecmp(data_type[i % columnCount], "TIMESTAMP") == 0) {
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
} else if (strcasecmp(data_type[i % c], "FLOAT") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "FLOAT") == 0) {
pstr += sprintf(pstr, ",%10.4f", rand_float());
} else if (strcasecmp(data_type[i % c], "DOUBLE") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "DOUBLE") == 0) {
double t = rand_double();
pstr += sprintf(pstr, ",%20.8f", t);
} else if (strcasecmp(data_type[i % c], "BOOL") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "BOOL") == 0) {
bool b = rand_bool() & 1;
pstr += sprintf(pstr, ",%s", b ? "true" : "false");
} else if (strcasecmp(data_type[i % c], "BINARY") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) {
char *s = malloc(lenOfBinary);
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s);
free(s);
} else if (strcasecmp(data_type[i % c], "NCHAR") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) {
char *s = malloc(lenOfBinary);
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s);
......@@ -4818,32 +4908,59 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return 0;
}
static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
{
int affectedRows;
int32_t affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, buffer);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
} else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) {
__func__, __LINE__, pThreadInfo->buffer);
uint16_t iface;
if (superTblInfo)
iface = superTblInfo->iface;
else
iface = g_args.iface;
debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__,
(g_args.iface==TAOSC_IFACE)?
"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt");
switch(iface) {
case TAOSC_IFACE:
affectedRows = queryDbExec(
pThreadInfo->taos,
pThreadInfo->buffer, INSERT_TYPE, false);
break;
case REST_IFACE:
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
buffer, NULL /* not set result file */)) {
pThreadInfo->buffer, pThreadInfo)) {
affectedRows = -1;
printf("========restful return fail, threadID[%d]\n",
pThreadInfo->threadID);
} else {
affectedRows = k;
}
} else {
errorPrint("%s() LN%d: unknown insert mode: %s\n",
__func__, __LINE__, superTblInfo->insertMode);
affectedRows = 0;
break;
#if STMT_IFACE_ENABLED == 1
case STMT_IFACE:
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
errorPrint("%s() LN%d, failied to execute insert statement\n",
__func__, __LINE__);
exit(-1);
}
} else {
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
affectedRows = k;
break;
#endif
default:
errorPrint("%s() LN%d: unknown insert mode: %d\n",
__func__, __LINE__, superTblInfo->iface);
affectedRows = 0;
}
return affectedRows;
......@@ -4853,14 +4970,13 @@ static void getTableName(char *pTblName,
threadInfo* pThreadInfo, uint64_t tableSeq)
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) {
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) {
if (superTblInfo->childTblLimit > 0) {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName +
(tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else {
verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
......@@ -4868,55 +4984,114 @@ static void getTableName(char *pTblName,
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
superTblInfo->childTblPrefix, tableSeq);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
g_args.tb_prefix, tableSeq);
}
}
static int64_t generateDataTail(
SSuperTable* superTblInfo,
uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows,
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) {
uint64_t len = 0;
uint32_t ncols_per_record = 1; // count first col ts
static int32_t generateDataTailWithoutStb(
uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t recordFrom, int64_t startTime,
/* int64_t *pSamplePos, */int64_t *dataLen) {
uint64_t len = 0;
char *pstr = buffer;
if (superTblInfo == NULL) {
uint32_t datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++;
ncols_per_record ++;
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
int32_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0;
char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary;
retLen = generateData(data, data_type,
startTime + getTSRandTail(
(int64_t) DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange),
lenOfBinary);
if (len > remainderBufLen)
break;
pstr += sprintf(pstr, "%s", data);
k++;
len += retLen;
remainderBufLen -= retLen;
verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
recordFrom ++;
if (recordFrom >= insertRows) {
break;
}
}
*dataLen = len;
return k;
}
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
int disorderRatio, int disorderRange)
{
int64_t randTail = timeStampStep * seq;
if (disorderRatio > 0) {
int rand_num = taosRandom() % 100;
if(rand_num < disorderRatio) {
randTail = (randTail +
(taosRandom() % disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %"PRId64"\n", randTail);
}
}
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
return randTail;
}
static int32_t generateStbDataTail(
SSuperTable* superTblInfo,
uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t recordFrom, int64_t startTime,
int64_t *pSamplePos, int64_t *dataLen) {
uint64_t len = 0;
char *pstr = buffer;
bool tsRand;
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand")))) {
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true;
} else {
tsRand = false;
}
verbosePrint("%s() LN%d batch=%u\n", __func__, __LINE__, batch);
uint64_t k = 0;
int32_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0;
if (superTblInfo) {
if (tsRand) {
retLen = generateRowData(
data,
retLen = generateStbRowData(superTblInfo, data,
startTime + getTSRandTail(
superTblInfo->timeStampStep, k,
superTblInfo->disorderRatio,
superTblInfo->disorderRange),
superTblInfo);
superTblInfo->disorderRange)
);
} else {
retLen = getRowDataFromSample(
data,
......@@ -4925,6 +5100,7 @@ static int64_t generateDataTail(
superTblInfo,
pSamplePos);
}
if (retLen > remainderBufLen) {
break;
}
......@@ -4933,31 +5109,13 @@ static int64_t generateDataTail(
k++;
len += retLen;
remainderBufLen -= retLen;
} else {
char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary;
retLen = generateData(data, data_type,
ncols_per_record,
startTime + getTSRandTail(
DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange),
lenOfBinary);
if (len > remainderBufLen)
break;
pstr += sprintf(pstr, "%s", data);
k++;
len += retLen;
remainderBufLen -= retLen;
}
verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n",
verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
startFrom ++;
recordFrom ++;
if (startFrom >= insertRows) {
if (recordFrom >= insertRows) {
break;
}
}
......@@ -4966,16 +5124,40 @@ static int64_t generateDataTail(
return k;
}
static int generateSQLHead(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo, SSuperTable* superTblInfo,
static int generateSQLHeadWithoutStb(char *tableName,
char *dbName,
char *buffer, int remainderBufLen)
{
int len;
char headBuf[HEAD_BUFF_LEN];
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
if (len > remainderBufLen)
return -1;
tstrncpy(buffer, headBuf, len + 1);
return len;
}
static int generateStbSQLHead(
SSuperTable* superTblInfo,
char *tableName, int32_t tableSeq,
char *dbName,
char *buffer, int remainderBufLen)
{
int len;
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
char headBuf[HEAD_BUFF_LEN];
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
......@@ -4995,9 +5177,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
headBuf,
HEAD_BUFF_LEN,
"%s.%s using %s.%s tags %s values",
pThreadInfo->db_name,
dbName,
tableName,
pThreadInfo->db_name,
dbName,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
......@@ -5006,22 +5188,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
tableName);
} else {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
dbName,
tableName);
}
} else {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
dbName,
tableName);
}
......@@ -5033,8 +5207,11 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
return len;
}
static int64_t generateInterlaceDataBuffer(
char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes,
static int32_t generateStbInterlaceData(
SSuperTable *superTblInfo,
char *tableName, uint32_t batchPerTbl,
uint64_t i,
uint32_t batchPerTblTimes,
uint64_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
......@@ -5043,10 +5220,11 @@ static int64_t generateInterlaceDataBuffer(
{
assert(buffer);
char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr, *pRemainderBufLen);
int headLen = generateStbSQLHead(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name,
pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
......@@ -5060,19 +5238,15 @@ static int64_t generateInterlaceDataBuffer(
int64_t dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%"PRIu64" batchPerTbl = %"PRIu64"\n",
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int64_t k = generateDataTail(
int32_t k = generateStbDataTail(
superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
......@@ -5082,7 +5256,7 @@ static int64_t generateInterlaceDataBuffer(
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n",
debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n",
__func__, __LINE__, k, batchPerTbl);
pstr -= headLen;
pstr[0] = '\0';
......@@ -5092,50 +5266,363 @@ static int64_t generateInterlaceDataBuffer(
return k;
}
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
int disorderRatio, int disorderRange)
{
int64_t randTail = timeStampStep * seq;
if (disorderRatio > 0) {
int rand_num = taosRandom() % 100;
if(rand_num < disorderRatio) {
randTail = (randTail +
(taosRandom() % disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %"PRId64"\n", randTail);
}
}
return randTail;
}
static int64_t generateProgressiveDataBuffer(
char *tableName,
int64_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
static int64_t generateInterlaceDataWithoutStb(
char *tableName, uint32_t batch,
uint64_t tableSeq,
char *dbName, char *buffer,
int64_t insertRows,
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen)
int64_t startTime,
uint64_t *pRemainderBufLen)
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
assert(buffer);
char *pstr = buffer;
int ncols_per_record = 1; // count first col ts
int headLen = generateSQLHeadWithoutStb(
tableName, dbName,
pstr, *pRemainderBufLen);
if (superTblInfo == NULL) {
int datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++;
ncols_per_record ++;
}
if (headLen <= 0) {
return 0;
}
assert(buffer != NULL);
char *pstr = buffer;
pstr += headLen;
*pRemainderBufLen -= headLen;
int64_t k = 0;
int64_t dataLen = 0;
int32_t k = generateDataTailWithoutStb(
batch, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&dataLen);
if (k == batch) {
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
debugPrint("%s() LN%d, generated data tail: %d, not equal batch per table: %u\n",
__func__, __LINE__, k, batch);
pstr -= headLen;
pstr[0] = '\0';
k = 0;
}
return k;
}
#if STMT_IFACE_ENABLED == 1
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
char *dataType, int32_t dataLen, char **ptr)
{
if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_binary = (char *)*ptr;
rand_string(bind_binary, dataLen);
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
bind->buffer_length = dataLen;
bind->buffer = bind_binary;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"NCHAR", strlen("NCHAR"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_nchar = (char *)*ptr;
rand_string(bind_nchar, dataLen);
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
bind->buffer_length = strlen(bind_nchar);
bind->buffer = bind_nchar;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"INT", strlen("INT"))) {
int32_t *bind_int = (int32_t *)*ptr;
*bind_int = rand_int();
bind->buffer_type = TSDB_DATA_TYPE_INT;
bind->buffer_length = sizeof(int32_t);
bind->buffer = bind_int;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = (int64_t *)*ptr;
*bind_bigint = rand_bigint();
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_bigint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = (float *) *ptr;
*bind_float = rand_float();
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
bind->buffer_length = sizeof(float);
bind->buffer = bind_float;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = (double *)*ptr;
*bind_double = rand_double();
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
bind->buffer_length = sizeof(double);
bind->buffer = bind_double;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = (int16_t *)*ptr;
*bind_smallint = rand_smallint();
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
bind->buffer_length = sizeof(int16_t);
bind->buffer = bind_smallint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = (int8_t *)*ptr;
*bind_tinyint = rand_tinyint();
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = (int8_t *)*ptr;
*bind_bool = rand_bool();
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_bool;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = (int64_t *) *ptr;
*bind_ts2 = rand_bigint();
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts2;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else {
errorPrint( "No support data type: %s\n",
dataType);
return -1;
}
return 0;
}
static int32_t prepareStmtWithoutStb(
TAOS_STMT *stmt,
char *tableName,
uint32_t batch,
int64_t insertRows,
int64_t recordFrom,
int64_t startTime)
{
int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
tableName, ret, taos_errstr(NULL));
return ret;
}
char **data_type = g_args.datatype;
char *bindArray = malloc(sizeof(TAOS_BIND) * (g_args.num_of_CPR + 1));
if (bindArray == NULL) {
errorPrint("Failed to allocate %d bind params\n",
(g_args.num_of_CPR + 1));
return -1;
}
int32_t k = 0;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
*bind_ts = startTime + getTSRandTail(
(int64_t)DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange);
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
for (int i = 0; i < g_args.num_of_CPR; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
if ( -1 == prepareStmtBindArrayByType(
bind,
data_type[i],
g_args.len_of_binary,
&ptr)) {
return -1;
}
}
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
// if msg > 3MB, break
taos_stmt_add_batch(stmt);
k++;
recordFrom ++;
if (recordFrom >= insertRows) {
break;
}
}
free(bindArray);
return k;
}
static int32_t prepareStbStmt(SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime, char *buffer)
{
int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
tableName, ret, taos_errstr(NULL));
return ret;
}
char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("Failed to allocate %d bind params\n",
(stbInfo->columnCount + 1));
return -1;
}
bool tsRand;
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true;
} else {
tsRand = false;
}
uint32_t k;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (tsRand) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * k;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
for (int i = 0; i < stbInfo->columnCount; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen,
&ptr)) {
return -1;
}
}
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
// if msg > 3MB, break
taos_stmt_add_batch(stmt);
k++;
recordFrom ++;
if (recordFrom >= insertRows) {
break;
}
}
free(bindArray);
return k;
}
#endif
static int32_t generateStbProgressiveData(
SSuperTable *superTblInfo,
char *tableName,
int64_t tableSeq,
char *dbName, char *buffer,
int64_t insertRows,
uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen)
{
assert(buffer != NULL);
char *pstr = buffer;
memset(buffer, 0, *pRemainderBufLen);
int64_t headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo,
int64_t headLen = generateStbSQLHead(
superTblInfo,
tableName, tableSeq, dbName,
buffer, *pRemainderBufLen);
if (headLen <= 0) {
......@@ -5145,12 +5632,43 @@ static int64_t generateProgressiveDataBuffer(
*pRemainderBufLen -= headLen;
int64_t dataLen;
k = generateDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
return generateStbDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, recordFrom,
startTime,
pSamplePos, &dataLen);
}
return k;
static int32_t generateProgressiveDataWithoutStb(
char *tableName,
/* int64_t tableSeq, */
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
uint64_t recordFrom, int64_t startTime, /*int64_t *pSamplePos, */
int64_t *pRemainderBufLen)
{
assert(buffer != NULL);
char *pstr = buffer;
memset(buffer, 0, *pRemainderBufLen);
int64_t headLen = generateSQLHeadWithoutStb(
tableName, pThreadInfo->db_name,
buffer, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen;
*pRemainderBufLen -= headLen;
int64_t dataLen;
return generateDataTailWithoutStb(
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom,
startTime,
/*pSamplePos, */&dataLen);
}
static void printStatPerThread(threadInfo *pThreadInfo)
......@@ -5162,12 +5680,16 @@ static void printStatPerThread(threadInfo *pThreadInfo)
(double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0)));
}
// sync write interlace data
static void* syncWriteInterlace(threadInfo *pThreadInfo) {
debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows;
uint64_t interlaceRows;
uint32_t interlaceRows;
uint64_t maxSqlLen;
int64_t nTimeStampStep;
uint64_t insert_interval;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
......@@ -5180,45 +5702,48 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} else {
interlaceRows = superTblInfo->interlaceRows;
}
maxSqlLen = superTblInfo->maxSqlLen;
nTimeStampStep = superTblInfo->timeStampStep;
insert_interval = superTblInfo->insertInterval;
} else {
insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = DEFAULT_TIMESTAMP_STEP;
insert_interval = g_args.insert_interval;
}
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
if (interlaceRows > insertRows)
interlaceRows = insertRows;
if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR;
int insertMode;
uint32_t batchPerTbl = interlaceRows;
uint32_t batchPerTblTimes;
if (interlaceRows > 0) {
insertMode = INTERLACE_INSERT_MODE;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
g_args.num_of_RPR / interlaceRows;
} else {
insertMode = PROGRESSIVE_INSERT_MODE;
batchPerTblTimes = 1;
}
// TODO: prompt tbl count multple interlace rows and batch
//
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) {
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__, maxSqlLen, strerror(errno));
return NULL;
}
char tableName[TSDB_TABLE_NAME_LEN];
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
int64_t nTimeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
uint64_t insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = UINT64_MAX;
......@@ -5227,69 +5752,98 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t endTs;
uint64_t tableSeq = pThreadInfo->start_table_from;
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
int64_t startTime = pThreadInfo->start_time;
uint64_t batchPerTbl = interlaceRows;
uint64_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
g_args.num_of_RPR / interlaceRows;
} else {
batchPerTblTimes = 1;
}
uint64_t generatedRecPerTbl = 0;
bool flagSleep = true;
uint64_t sleepTimeTotal = 0;
char *strInsertInto = "insert into ";
int nInsertBufLen = strlen(strInsertInto);
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampMs();
flagSleep = false;
}
// generate data
memset(buffer, 0, maxSqlLen);
memset(pThreadInfo->buffer, 0, maxSqlLen);
uint64_t remainderBufLen = maxSqlLen;
char *pstr = buffer;
char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto);
int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len;
remainderBufLen -= len;
uint64_t recOfBatch = 0;
uint32_t recOfBatch = 0;
for (uint32_t i = 0; i < batchPerTblTimes; i ++) {
char tableName[TSDB_TABLE_NAME_LEN];
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
getTableName(tableName, pThreadInfo, tableSeq);
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(buffer);
free(pThreadInfo->buffer);
return NULL;
}
uint64_t oldRemainderLen = remainderBufLen;
int64_t generated = generateInterlaceDataBuffer(
tableName, batchPerTbl, i, batchPerTblTimes,
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt(superTblInfo,
pThreadInfo->stmt,
tableName,
batchPerTbl,
insertRows, i,
startTime,
pThreadInfo->buffer);
#else
generated = -1;
#endif
} else {
generated = generateStbInterlaceData(
superTblInfo,
tableName, batchPerTbl, i,
batchPerTblTimes,
tableSeq,
pThreadInfo, pstr,
insertRows,
startTime,
&remainderBufLen);
}
} else {
if (g_args.iface == STMT_IFACE) {
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__,
tableName, batchPerTbl, startTime);
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt, tableName,
batchPerTbl,
insertRows, i,
startTime);
#else
generated = -1;
#endif
} else {
generated = generateInterlaceDataWithoutStb(
tableName, batchPerTbl,
tableSeq,
pThreadInfo->db_name, pstr,
insertRows,
startTime,
&remainderBufLen);
}
}
debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
debugPrint("[%d] %s() LN%d, generated records is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
if (generated < 0) {
errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
errorPrint("[%d] %s() LN%d, generated records is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_of_interlace;
} else if (generated == 0) {
......@@ -5298,14 +5852,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
tableSeq ++;
recOfBatch += batchPerTbl;
pstr += (oldRemainderLen - remainderBufLen);
// startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo->totalInsertRows += batchPerTbl;
verbosePrint("[%d] %s() LN%d batchPerTbl=%"PRId64" recOfBatch=%"PRId64"\n",
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
......@@ -5318,14 +5872,13 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (generatedRecPerTbl >= insertRows)
break;
int remainRows = insertRows - generatedRecPerTbl;
int64_t remainRows = insertRows - generatedRecPerTbl;
if ((remainRows > 0) && (batchPerTbl > remainRows))
batchPerTbl = remainRows;
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
break;
}
}
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__,
......@@ -5335,22 +5888,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
break;
}
verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n",
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
pThreadInfo->threadID, __func__, __LINE__, buffer);
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer);
startTs = taosGetTimestampMs();
if (recOfBatch == 0) {
errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n",
errorPrint("[%d] %s() LN%d try inserting records of batch is %d\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch);
errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
goto free_of_interlace;
}
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs;
......@@ -5366,9 +5919,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->totalDelay += delay;
if (recOfBatch != affectedRows) {
errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, buffer);
recOfBatch, affectedRows, pThreadInfo->buffer);
goto free_of_interlace;
}
......@@ -5387,8 +5940,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
et = taosGetTimestampMs();
if (insert_interval > (et - st) ) {
int sleepTime = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
uint64_t sleepTime = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
__func__, __LINE__, sleepTime);
taosMsleep(sleepTime); // ms
sleepTimeTotal += insert_interval;
......@@ -5397,27 +5950,26 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
free_of_interlace:
tmfree(buffer);
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
}
// sync insertion
/*
1 thread: 100 tables * 2000 rows/s
1 thread: 10 tables * 20000 rows/s
6 thread: 300 tables * 2000 rows/s
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
// sync insertion progressive data
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int64_t insertRows =
(superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) {
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen,
strerror(errno));
......@@ -5428,35 +5980,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
/* int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
*/
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
pThreadInfo->samplePos = 0;
for (uint64_t tableSeq =
pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (uint64_t i = 0; i < insertRows;) {
/*
if (insert_interval) {
st = taosGetTimestampMs();
}
*/
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
......@@ -5464,19 +5998,57 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->threadID, tableSeq, tableName);
int64_t remainderBufLen = maxSqlLen;
char *pstr = buffer;
int nInsertBufLen = strlen("insert into ");
char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into ");
int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len;
remainderBufLen -= len;
int64_t generated = generateProgressiveDataBuffer(
tableName, tableSeq, pThreadInfo, pstr, insertRows,
i, start_time,
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt(
superTblInfo,
pThreadInfo->stmt,
tableName,
g_args.num_of_RPR,
insertRows, i, start_time, pstr);
#else
generated = -1;
#endif
} else {
generated = generateStbProgressiveData(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr,
insertRows, i, start_time,
&(pThreadInfo->samplePos),
&remainderBufLen);
}
} else {
if (g_args.iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt,
tableName,
g_args.num_of_RPR,
insertRows, i,
start_time);
#else
generated = -1;
#endif
} else {
generated = generateProgressiveDataWithoutStb(
tableName,
/* tableSeq, */
pThreadInfo, pstr, insertRows,
i, start_time,
/* &(pThreadInfo->samplePos), */
&remainderBufLen);
}
}
if (generated > 0)
i += generated;
else
......@@ -5487,13 +6059,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
startTs = taosGetTimestampMs();
int64_t affectedRows = execInsert(pThreadInfo, buffer, generated);
int32_t affectedRows = execInsert(pThreadInfo, generated);
endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
__func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
verbosePrint("[%d] %s() LN%d affectedRows=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
......@@ -5503,7 +6075,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->totalDelay += delay;
if (affectedRows < 0) {
errorPrint("%s() LN%d, affected rows: %"PRId64"\n",
errorPrint("%s() LN%d, affected rows: %d\n",
__func__, __LINE__, affectedRows);
goto free_of_progressive;
}
......@@ -5521,32 +6093,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (i >= insertRows)
break;
/*
if (insert_interval) {
et = taosGetTimestampMs();
if (insert_interval > ((et - st)) ) {
int sleep_time = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms
}
}
*/
} // num_of_DPT
if (g_args.verbose_print) {
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) &&
(0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos);
}
}
} // tableSeq
free_of_progressive:
tmfree(buffer);
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
}
......@@ -5556,7 +6115,7 @@ static void* syncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int interlaceRows;
uint32_t interlaceRows;
if (superTblInfo) {
if ((superTblInfo->interlaceRows == 0)
......@@ -5576,7 +6135,6 @@ static void* syncWrite(void *sarg) {
// progressive mode
return syncWriteProgressive(pThreadInfo);
}
}
static void callBack(void *param, TAOS_RES *res, int code) {
......@@ -5616,9 +6174,10 @@ static void callBack(void *param, TAOS_RES *res, int code) {
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateRowData(data, d, pThreadInfo->superTblInfo);
generateStbRowData(pThreadInfo->superTblInfo, data, d);
} else {
generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo);
generateStbRowData(pThreadInfo->superTblInfo,
data, pThreadInfo->lastTs += 1000);
}
pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++;
......@@ -5686,24 +6245,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
static void startMultiThreadInsertData(int threads, char* db_name,
char* precision,SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = malloc(threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
//TAOS* taos;
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
// if (NULL == taos) {
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
// exit(-1);
// }
//}
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
if (0 == strncasecmp(precision, "ms", 2)) {
......@@ -5755,17 +6296,17 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
}
TAOS* taos = taos_connect(
TAOS* taos0 = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL));
exit(-1);
}
int64_t ntables = 0;
uint64_t startFrom;
uint64_t tableFrom;
if (superTblInfo) {
int64_t limit;
......@@ -5792,7 +6333,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
ntables = limit;
startFrom = offset;
tableFrom = offset;
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
......@@ -5811,23 +6352,23 @@ static void startMultiThreadInsertData(int threads, char* db_name,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos);
taos_close(taos0);
exit(-1);
}
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos,
taos0,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
limit,
offset);
} else {
ntables = g_args.num_of_tables;
startFrom = 0;
tableFrom = 0;
}
taos_close(taos);
taos_close(taos0);
int64_t a = ntables / threads;
if (a < 1) {
......@@ -5841,10 +6382,21 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
if ((superTblInfo)
&& (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) {
if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0)
&& (superTblInfo->iface == REST_IFACE)) {
if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1);
}
}
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
......@@ -5857,17 +6409,61 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) {
//pThreadInfo->taos = taos;
(superTblInfo->iface != REST_IFACE)) {
//t_info->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == pThreadInfo->taos) {
errorPrint(
"connect to server fail from insert sub thread, reason: %s\n",
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(infos);
exit(-1);
}
#if STMT_IFACE_ENABLED == 1
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) {
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) {
errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
char buffer[3000];
char *pstr = buffer;
pstr += sprintf(pstr, "INSERT INTO ? values(?");
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
}
#endif
} else {
pThreadInfo->taos = NULL;
}
......@@ -5875,10 +6471,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
......@@ -5907,6 +6503,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(pThreadInfo->lock_sem));
#if STMT_IFACE_ENABLED == 1
if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt);
}
#endif
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
......@@ -6182,13 +6785,12 @@ static int insertTestProcess() {
}
}
taosMsleep(1000);
// create sub threads for inserting data
//start = taosGetTimestampMs();
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
......@@ -6512,15 +7114,15 @@ static int queryTestProcess() {
b = ntables % threads;
}
uint64_t startFrom = 0;
uint64_t tableFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infosOfSub + i;
pThreadInfo->threadID = i;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
......@@ -6826,11 +7428,14 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]);
if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0]
!= 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo);
fetchResult(
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID],
pThreadInfo);
}
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
......@@ -6843,7 +7448,8 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] =
subscribeImpl(
SPECIFIED_CLASS,
pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
......@@ -6975,17 +7581,17 @@ static int subscribeTestProcess() {
}
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
uint64_t startFrom = 0;
uint64_t tableFrom = 0;
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
threadInfo *pThreadInfo = infosOfStable + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = j<b?a+1:a;
pThreadInfo->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = j<b?tableFrom+a:tableFrom+a-1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, pThreadInfo);
......@@ -7065,7 +7671,7 @@ static void setParaFromArg(){
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.dbCount = 1;
g_Dbs.db[0].drop = 1;
g_Dbs.db[0].drop = true;
tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE);
g_Dbs.db[0].dbCfg.replica = g_args.replica;
......@@ -7104,7 +7710,7 @@ static void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
......
......@@ -87,12 +87,12 @@ static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t time
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t daylight) {
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, time, timePrec);
} else {
return (*parseLocaltimeFp[daylight])(timestr, time, timePrec);
return (*parseLocaltimeFp[day_light])(timestr, time, timePrec);
}
}
......
......@@ -2845,6 +2845,8 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag,
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
val = tsdbGetTableName(pTable);
assert(val != NULL);
} else if (tagColId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
val = NULL;
} else {
val = tsdbGetTableTagVal(pTable, tagColId, type, bytes);
}
......
......@@ -68,7 +68,7 @@ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
TABLE_CHAR_NAME(pMeta->tables[tid]), TABLE_TID(pMeta->tables[tid]), TABLE_UID(pMeta->tables[tid]));
return 0;
} else {
tsdbError("vgId:%d table %s at tid %d uid %" PRIu64
tsdbInfo("vgId:%d table %s at tid %d uid %" PRIu64
" exists, replace it with new table, this can be not reasonable",
REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]), TABLE_TID(pMeta->tables[tid]),
TABLE_UID(pMeta->tables[tid]));
......
......@@ -367,20 +367,21 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
goto out_of_memory;
}
assert(pCond != NULL && pCond->numOfCols > 0 && pMemRef != NULL);
assert(pCond != NULL && pMemRef != NULL);
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
if (pCond->numOfCols > 0) {
// allocate buffer in order to load data blocks from file
pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis));
if (pQueryHandle->statis == NULL) {
goto out_of_memory;
}
pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
pQueryHandle->pColumns =
taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
if (pQueryHandle->pColumns == NULL) {
goto out_of_memory;
}
......@@ -397,10 +398,8 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
pQueryHandle->statis[i].colId = colInfo.info.colId;
}
if (pCond->numOfCols > 0) {
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
}
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
......
......@@ -91,18 +91,18 @@ static void vnodeIncRef(void *ptNode) {
}
void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = NULL;
SVnodeObj *pVnode = NULL;
if (tsVnodesHash != NULL) {
ppVnode = taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode, sizeof(void *));
}
if (ppVnode == NULL || *ppVnode == NULL) {
if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vDebug("vgId:%d, not exist", vgId);
return NULL;
}
return *ppVnode;
return pVnode;
}
void vnodeRelease(void *vparam) {
......
......@@ -303,6 +303,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
}
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam;
if (qtype == TAOS_QTYPE_RPC) {
if (!vnodeInReadyStatus(pVnode)) {
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
}
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
return TSDB_CODE_APP_NOT_READY;
}
}
SVWriteMsg *pWrite = vnodeBuildVWriteMsg(vparam, wparam, qtype, rparam);
if (pWrite == NULL) {
assert(terrno != 0);
......
......@@ -37,6 +37,7 @@ import requests
import gc
import taos
from .shared.types import TdColumns, TdTags
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
......@@ -160,6 +161,7 @@ class WorkerThread:
Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
try:
if (Config.getConfig().per_thread_db_connection): # most likely TRUE
......@@ -1362,9 +1364,12 @@ class Task():
Progress.emit(Progress.ACCEPTABLE_ERROR)
self._err = err
else: # not an acceptable error
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
shortTid = threading.get_ident() % 10000
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, thread={}, msg: {}, SQL: {}".format(
self.__class__.__name__,
errno2, err, wt.getDbConn().getLastSql())
errno2,
shortTid,
err, wt.getDbConn().getLastSql())
self.logDebug(errMsg)
if Config.getConfig().debug:
# raise # so that we see full stack
......@@ -1411,11 +1416,15 @@ class Task():
def lockTable(self, ftName): # full table name
# print(" <<" + ftName + '_', end="", flush=True)
with Task._lock:
if not ftName in Task._tableLocks:
with Task._lock: # SHORT lock! so we only protect lock creation
if not ftName in Task._tableLocks: # Create new lock and add to list, if needed
Task._tableLocks[ftName] = threading.Lock()
Task._tableLocks[ftName].acquire()
# No lock protection, anybody can do this any time
lock = Task._tableLocks[ftName]
# Logging.info("Acquiring lock: {}, {}".format(ftName, lock))
lock.acquire()
# Logging.info("Acquiring lock successful: {}".format(lock))
def unlockTable(self, ftName):
# print('_' + ftName + ">> ", end="", flush=True)
......@@ -1425,7 +1434,13 @@ class Task():
lock = Task._tableLocks[ftName]
if not lock.locked():
raise RuntimeError("Corrupte state, already unlocked")
# Important note, we want to protect unlocking under the task level
# locking, because we don't want the lock to be deleted (maybe in the futur)
# while we unlock it
# Logging.info("Releasing lock: {}".format(lock))
lock.release()
# Logging.info("Releasing lock successful: {}".format(lock))
class ExecutionStats:
......@@ -1696,6 +1711,11 @@ class TdSuperTable:
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
'''
Make sure a regular table exists for this super table, creating it if necessary.
If there is an associated "Task" that wants to do this, "lock" this table so that
others don't access it while we create it.
'''
dbName = self._dbName
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already
......@@ -1703,18 +1723,24 @@ class TdSuperTable:
# acquire a lock first, so as to be able to *verify*. More details in TD-1471
fullTableName = dbName + '.' + regTableName
if task is not None: # TODO: what happens if we don't lock the table
task.lockTable(fullTableName)
if task is not None: # Somethime thie operation is requested on behalf of a "task"
# Logging.info("Locking table for creation: {}".format(fullTableName))
task.lockTable(fullTableName) # in which case we'll lock this table to ensure serialized access
# Logging.info("Table locked for creation".format(fullTableName))
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
# print("(" + fullTableName[-3:] + ")", end="", flush=True)
try:
sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
)
# Logging.info("Creating regular with SQL: {}".format(sql))
dbc.execute(sql)
# Logging.info("Regular table created: {}".format(sql))
finally:
if task is not None:
# Logging.info("Unlocking table after creation: {}".format(fullTableName))
task.unlockTable(fullTableName) # no matter what
# Logging.info("Table unlocked after creation: {}".format(fullTableName))
def _getTagStrForSql(self, dbc) :
tags = self._getTags(dbc)
......@@ -2011,9 +2037,30 @@ class TaskAddData(StateTransitionTask):
def canBeginFrom(cls, state: AnyState):
return state.canAddData()
def _lockTableIfNeeded(self, fullTableName, extraMsg = ''):
if Config.getConfig().verify_data:
# Logging.info("Locking table: {}".format(fullTableName))
self.lockTable(fullTableName)
# Logging.info("Table locked {}: {}".format(extraMsg, fullTableName))
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
else:
# Logging.info("Skipping locking table")
pass
def _unlockTableIfNeeded(self, fullTableName):
if Config.getConfig().verify_data:
# Logging.info("Unlocking table: {}".format(fullTableName))
self.unlockTable(fullTableName)
# Logging.info("Table unlocked: {}".format(fullTableName))
else:
pass
# Logging.info("Skipping unlocking table")
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
fullTableName = db.getName() + '.' + regTableName
self._lockTableIfNeeded(fullTableName, 'batch')
sql = "INSERT INTO {} VALUES ".format(fullTableName)
for j in range(numRecords): # number of records per table
......@@ -2021,51 +2068,60 @@ class TaskAddData(StateTransitionTask):
nextTick = db.getNextTick()
nextColor = db.getNextColor()
sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
# Logging.info("Adding data in batch: {}".format(sql))
try:
dbc.execute(sql)
finally:
# Logging.info("Data added in batch: {}".format(sql))
self._unlockTableIfNeeded(fullTableName)
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table
nextInt = db.getNextInt()
intToWrite = db.getNextInt()
nextTick = db.getNextTick()
nextColor = db.getNextColor()
if Config.getConfig().record_ops:
self.prepToRecordOps()
if self.fAddLogReady is None:
raise CrashGenError("Unexpected empty fAddLogReady")
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
self.fAddLogReady.write("Ready to write {} to {}\n".format(intToWrite, regTableName))
self.fAddLogReady.flush()
os.fsync(self.fAddLogReady.fileno())
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName
if Config.getConfig().verify_data:
self.lockTable(fullTableName)
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
try:
sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
fullTableName,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt, nextColor)
nextTick, intToWrite, nextColor)
# Logging.info("Adding data: {}".format(sql))
dbc.execute(sql)
# Logging.info("Data added: {}".format(sql))
intWrote = intToWrite
# Quick hack, attach an update statement here. TODO: create an "update" task
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
nextInt = db.getNextInt()
intToUpdate = db.getNextInt() # Updated, but should not succeed
nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
fullTableName,
nextTick, nextInt, nextColor)
nextTick, intToUpdate, nextColor)
# sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
# fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
dbc.execute(sql)
intWrote = intToUpdate # We updated, seems TDengine non-cluster accepts this.
except: # Any exception at all
if Config.getConfig().verify_data:
self.unlockTable(fullTableName)
self._unlockTableIfNeeded(fullTableName)
raise
# Now read it back and verify, we might encounter an error if table is dropped
......@@ -2073,33 +2129,41 @@ class TaskAddData(StateTransitionTask):
try:
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
format(db.getName(), regTableName, nextTick))
if readBack != nextInt :
if readBack != intWrote :
raise taos.error.ProgrammingError(
"Failed to read back same data, wrote: {}, read: {}"
.format(nextInt, readBack), 0x999)
.format(intWrote, readBack), 0x999)
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT] : # not a single result
if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result
raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
.format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"),
"Failed to read back same data for tick: {}, wrote: {}, read: EMPTY"
.format(nextTick, intWrote),
errno)
elif errno == CrashGenError.INVALID_MULTIPLE_RESULT : # multiple results
raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: MULTIPLE RESULTS"
.format(nextTick, intWrote),
errno)
elif errno in [0x218, 0x362]: # table doesn't exist
# do nothing
dummy = 0
pass
else:
# Re-throw otherwise
raise
finally:
self.unlockTable(fullTableName) # Unlock the table no matter what
self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock
# Done with read-back verification, unlock the table now
else:
self._unlockTableIfNeeded(fullTableName)
# Successfully wrote the data into the DB, let's record it somehow
te.recordDataMark(nextInt)
te.recordDataMark(intWrote)
if Config.getConfig().record_ops:
if self.fAddLogDone is None:
raise CrashGenError("Unexpected empty fAddLogDone")
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
self.fAddLogDone.write("Wrote {} to {}\n".format(intWrote, regTableName))
self.fAddLogDone.flush()
os.fsync(self.fAddLogDone.fileno())
......@@ -2137,15 +2201,16 @@ class TaskAddData(StateTransitionTask):
class ThreadStacks: # stack info for all threads
def __init__(self):
self._allStacks = {}
allFrames = sys._current_frames()
for th in threading.enumerate():
allFrames = sys._current_frames() # All current stack frames
for th in threading.enumerate(): # For each thread
if th.ident is None:
continue
stack = traceback.extract_stack(allFrames[th.ident])
self._allStacks[th.native_id] = stack
stack = traceback.extract_stack(allFrames[th.ident]) # Get stack for a thread
shortTid = th.ident % 10000
self._allStacks[shortTid] = stack # Was using th.native_id
def print(self, filteredEndName = None, filterInternal = False):
for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
for tIdent, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
lastFrame = stack[-1]
if filteredEndName: # we need to filter out stacks that match this name
if lastFrame.name == filteredEndName : # end did not match
......@@ -2157,7 +2222,7 @@ class ThreadStacks: # stack info for all threads
'__init__']: # the thread that extracted the stack
continue # ignore
# Now print
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid))
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(tIdent))
stackFrame = 0
for frame in stack: # was using: reversed(stack)
# print(frame)
......@@ -2376,7 +2441,7 @@ class MainExec:
action='store',
default=0,
type=int,
help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
help='Number of DBs to use, set to disable dropping DB. (default: 0)')
parser.add_argument(
'-c',
'--connector-type',
......
......@@ -179,7 +179,7 @@ quorum 2
def getServiceCmdLine(self): # to start the instance
if Config.getConfig().track_memory_leaks:
Logging.info("Invoking VALGRIND on service...")
return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()]
return ['exec valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()]
else:
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
......@@ -310,7 +310,7 @@ class TdeSubProcess:
# print("Starting TDengine with env: ", myEnv.items())
print("Starting TDengine: {}".format(cmdLine))
return Popen(
ret = Popen(
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
shell=True, # Always use shell, since we need to pass ENV vars
stdout=PIPE,
......@@ -318,6 +318,10 @@ class TdeSubProcess:
close_fds=ON_POSIX,
env=myEnv
) # had text=True, which interferred with reading EOF
time.sleep(0.01) # very brief wait, then let's check if sub process started successfully.
if ret.poll():
raise CrashGenError("Sub process failed to start with command line: {}".format(cmdLine))
return ret
STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
......@@ -614,7 +618,7 @@ class ServiceManager:
# Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter():
if proc.name() == 'taosd':
if proc.name() == 'taosd' or proc.name() == 'memcheck-amd64-': # Regular or under Valgrind
Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
time.sleep(2.0)
proc.kill()
......
......@@ -35,7 +35,8 @@ class LoggingFilter(logging.Filter):
class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return "[{:04d}] {}".format(threading.get_ident() % 10000, msg), kwargs
shortTid = threading.get_ident() % 10000
return "[{:04d}] {}".format(shortTid, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
......
......@@ -31,7 +31,7 @@ python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py
python3 ./test.py -f table/tablename-boundary.py
python3 ./test.py -f table/max_table_length.py
python3 ./test.py -f table/alter_column.py
python3 ./test.py -f table/boundary.py
......@@ -332,5 +332,5 @@ python3 ./test.py -f tag_lite/alter_tag.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py
python3 ./test.py -f tag_lite/drop_auto_create.py
#======================p4-end===============
......@@ -82,6 +82,8 @@ class TDTestCase:
tdSql.execute("import into tbx file \'%s\'"%(self.csvfile))
tdSql.query('select * from tbx')
tdSql.checkRows(self.rows)
#TD-4447 import the same csv twice
tdSql.execute("import into tbx file \'%s\'"%(self.csvfile))
def stop(self):
self.destroyCSVFile()
......
......@@ -37,6 +37,10 @@ class TDTestCase:
tdSql.error("insert into tb values (now, 'taosdata001')")
tdSql.error("insert into tb(now, 😀)")
tdSql.query("select * from tb")
tdSql.checkRows(2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
......@@ -14,6 +14,13 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1622100000000
def get_random_string(self, length):
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str
def run(self):
tdSql.prepare()
......@@ -24,20 +31,63 @@ class TDTestCase:
shell=True)) - 1
tdLog.info("table name max length is %d" % tableNameMaxLen)
chars = string.ascii_uppercase + string.ascii_lowercase
tb_name = ''.join(random.choices(chars, k=tableNameMaxLen))
tb_name = ''.join(random.choices(chars, k=tableNameMaxLen + 1))
tdLog.info('tb_name length %d' % len(tb_name))
tdLog.info('create table %s (ts timestamp, value int)' % tb_name)
tdSql.error(
'create table %s (ts timestamp, speed binary(4089))' %
tb_name)
tdSql.error('create table %s (ts timestamp, speed binary(4089))' % tb_name)
tb_name = ''.join(random.choices(chars, k=191))
tb_name = ''.join(random.choices(chars, k=tableNameMaxLen))
tdLog.info('tb_name length %d' % len(tb_name))
tdLog.info('create table %s (ts timestamp, value int)' % tb_name)
tdSql.execute(
'create table %s (ts timestamp, speed binary(4089))' %
tb_name)
db_name = self.get_random_string(33)
tdSql.error("create database %s" % db_name)
db_name = self.get_random_string(32)
tdSql.execute("create database %s" % db_name)
tdSql.execute("use %s" % db_name)
tb_name = self.get_random_string(193)
tdSql.error("create table %s(ts timestamp, val int)" % tb_name)
tb_name = self.get_random_string(192)
tdSql.execute("create table %s.%s(ts timestamp, val int)" % (db_name, tb_name))
tdSql.query("show %s.tables" % db_name)
tdSql.checkRows(1)
tdSql.checkData(0, 0, tb_name)
tdSql.execute("insert into %s.%s values(now, 1)" % (db_name, tb_name))
tdSql.query("select * from %s.%s" %(db_name, tb_name))
tdSql.checkRows(1)
db_name = self.get_random_string(32)
tdSql.execute("create database %s update 1" % db_name)
stb_name = self.get_random_string(192)
tdSql.execute("create table %s.%s(ts timestamp, val int) tags(id int)" % (db_name, stb_name))
tb_name1 = self.get_random_string(192)
tdSql.execute("insert into %s.%s using %s.%s tags(1) values(%d, 1)(%d, 2)(%d, 3)" % (db_name, tb_name1, db_name, stb_name, self.ts, self.ts + 1, self.ts + 2))
tb_name2 = self.get_random_string(192)
tdSql.execute("insert into %s.%s using %s.%s tags(2) values(%d, 1)(%d, 2)(%d, 3)" % (db_name, tb_name2, db_name, stb_name, self.ts, self.ts + 1, self.ts + 2))
tdSql.query("show %s.tables" % db_name)
tdSql.checkRows(2)
tdSql.query("select * from %s.%s" % (db_name, stb_name))
tdSql.checkRows(6)
tdSql.execute("insert into %s.%s using %s.%s tags(1) values(%d, null)" % (db_name, tb_name1, db_name, stb_name, self.ts))
tdSql.query("select * from %s.%s" % (db_name, stb_name))
tdSql.checkRows(6)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdSql.execute('create table m1(ts timestamp, k int) tags(a binary(12), b int, c double);')
tdSql.execute('insert into tm0 using m1(b,c) tags(1, 99) values(now, 1);')
tdSql.execute('insert into tm1 using m1(b,c) tags(2, 100) values(now, 2);')
tdLog.info("2 rows inserted")
tdSql.query('select * from m1;')
tdSql.checkRows(2)
tdSql.query('select *,tbname from m1;')
tdSql.execute("drop table tm0; ")
tdSql.query('select * from m1')
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -33,6 +33,7 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
$dbPrefix = m_di_db
$tbPrefix = m_di_tb
$mtPrefix = m_di_mt
$ntPrefix = m_di_nt
$tbNum = 1
$rowNum = 2000
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
$nt = $ntPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
sql create table $nt (ts timestamp, tbcol int)
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $nt values ($ms , $x )
$x = $x + 1
endw
sleep 100
print =============== step2
$i = 0
$tb = $tbPrefix . $i
sql select _block_dist() from $tb
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== step3
$i = 0
$mt = $mtPrefix . $i
sql select _block_dist() from $mt
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== step4
$i = 0
$nt = $ntPrefix . $i
sql select _block_dist() from $nt
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
print =============== clear
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -14,3 +14,4 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
......@@ -32,6 +32,7 @@ run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册