未验证 提交 92615c6d 编写于 作者: Y Yang Zhao 提交者: GitHub

[TD-10689]<improve>optimize memory using for arm platform(master) (#8381)

* [TD-10689]optimize memory using for arm platform

* [TD-10689]optimize memory using for arm platform(master)

* fix

* remove empty line

* remove escape Char

* resolve conflict:

* remove unused variables
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 135a70e0
......@@ -75,6 +75,7 @@ extern char configDir[];
#define MAX_DATA_SIZE (16*TSDB_MAX_COLUMNS)+20 // max record len: 16*MAX_COLUMNS, timestamp string and ,('') need extra space
#define OPT_ABORT 1 /* –abort */
#define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255.
#define MAX_PATH_LEN 4096
#define DEFAULT_START_TIME 1500000000000
......@@ -244,6 +245,7 @@ typedef struct SArguments_S {
uint64_t insert_interval;
uint64_t timestamp_step;
int64_t query_times;
int64_t prepared_rand;
uint32_t interlaceRows;
uint32_t reqPerReq; // num_of_records_per_req
uint64_t max_sql_len;
......@@ -303,6 +305,7 @@ typedef struct SSuperTable_S {
uint64_t lenOfTagOfOneRow;
char* sampleDataBuf;
bool useSampleTs;
uint32_t tagSource; // 0: rand, 1: tag sample
char* tagDataBuf;
......@@ -363,7 +366,7 @@ typedef struct SDataBase_S {
bool drop; // 0: use exists, 1: if exists, drop then new create
SDbCfg dbCfg;
uint64_t superTblCount;
SSuperTable superTbls[MAX_SUPER_TABLE_COUNT];
SSuperTable* superTbls;
} SDataBase;
typedef struct SDbs_S {
......@@ -382,12 +385,11 @@ typedef struct SDbs_S {
uint32_t threadCount;
uint32_t threadCountForCreateTbl;
uint32_t dbCount;
SDataBase db[MAX_DB_COUNT];
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
SDataBase* db;
} SDbs;
typedef struct SpecifiedQueryInfo_S {
......@@ -501,6 +503,7 @@ typedef struct SThreadInfo_S {
uint64_t querySeq; // sequence number of sql command
TAOS_SUB* tsub;
int sockfd;
} threadInfo;
#ifdef WINDOWS
......@@ -580,8 +583,7 @@ static void prompt();
static int createDatabasesAndStables();
static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static int postProceSql(char *host, struct sockaddr_in *pServAddr,
uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
static int postProceSql(char *host, uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
int disorderRatio, int disorderRange);
static bool getInfoFromJsonFile(char* file);
......@@ -590,12 +592,12 @@ static int regexMatch(const char *s, const char *reg, int cflags);
/* ************ Global variables ************ */
int32_t g_randint[MAX_PREPARED_RAND];
uint32_t g_randuint[MAX_PREPARED_RAND];
int64_t g_randbigint[MAX_PREPARED_RAND];
uint64_t g_randubigint[MAX_PREPARED_RAND];
float g_randfloat[MAX_PREPARED_RAND];
double g_randdouble[MAX_PREPARED_RAND];
int32_t* g_randint;
uint32_t* g_randuint;
int64_t* g_randbigint;
uint64_t* g_randubigint;
float* g_randfloat;
double* g_randdouble;
char *g_randbool_buff = NULL;
char *g_randint_buff = NULL;
......@@ -662,6 +664,7 @@ SArguments g_args = {
0, // insert_interval
DEFAULT_TIMESTAMP_STEP, // timestamp_step
1, // query_times
10000, // prepared_rand
DEFAULT_INTERLACE_ROWS, // interlaceRows;
30000, // reqPerReq
(1024*1024), // max_sql_len
......@@ -796,6 +799,8 @@ static void printHelp() {
"Set the replica parameters of the database, By default use 1, min: 1, max: 3.");
printf("%s%s%s%s\n", indent, "-m, --table-prefix=TABLEPREFIX", "\t",
"Table prefix name. By default use 'd'.");
printf("%s%s%s%s\n", indent, "-E, --escape-character", "\t",
"Use escape character for Both Stable and normmal table name");
printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t",
"The select sql file.");
printf("%s%s%s%s\n", indent, "-N, --normal-table", "\t\t", "Use normal table flag.");
......@@ -2097,7 +2102,7 @@ static void tmfclose(FILE *fp) {
}
}
static void tmfree(char *buf) {
static void tmfree(void *buf) {
if (NULL != buf) {
free(buf);
buf = NULL;
......@@ -2205,7 +2210,7 @@ static void selectAndGetResult(
} else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port,
g_queryInfo.host, g_queryInfo.port,
command,
pThreadInfo);
if (0 != retCode) {
......@@ -2221,157 +2226,157 @@ static void selectAndGetResult(
static char *rand_bool_str() {
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randbool_buff + ((cursor % MAX_PREPARED_RAND) * BOOL_BUFF_LEN);
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randbool_buff + ((cursor % g_args.prepared_rand) * BOOL_BUFF_LEN);
}
static int32_t rand_bool() {
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randint[cursor % MAX_PREPARED_RAND] % 2;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint[cursor % g_args.prepared_rand] % 2;
}
static char *rand_tinyint_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randtinyint_buff +
((cursor % MAX_PREPARED_RAND) * TINYINT_BUFF_LEN);
((cursor % g_args.prepared_rand) * TINYINT_BUFF_LEN);
}
static int32_t rand_tinyint()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randint[cursor % MAX_PREPARED_RAND] % 128;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint[cursor % g_args.prepared_rand] % 128;
}
static char *rand_utinyint_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randutinyint_buff +
((cursor % MAX_PREPARED_RAND) * TINYINT_BUFF_LEN);
((cursor % g_args.prepared_rand) * TINYINT_BUFF_LEN);
}
static int32_t rand_utinyint()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randuint[cursor % MAX_PREPARED_RAND] % 255;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randuint[cursor % g_args.prepared_rand] % 255;
}
static char *rand_smallint_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randsmallint_buff +
((cursor % MAX_PREPARED_RAND) * SMALLINT_BUFF_LEN);
((cursor % g_args.prepared_rand) * SMALLINT_BUFF_LEN);
}
static int32_t rand_smallint()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randint[cursor % MAX_PREPARED_RAND] % 32768;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint[cursor % g_args.prepared_rand] % 32768;
}
static char *rand_usmallint_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randusmallint_buff +
((cursor % MAX_PREPARED_RAND) * SMALLINT_BUFF_LEN);
((cursor % g_args.prepared_rand) * SMALLINT_BUFF_LEN);
}
static int32_t rand_usmallint()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randuint[cursor % MAX_PREPARED_RAND] % 65535;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randuint[cursor % g_args.prepared_rand] % 65535;
}
static char *rand_int_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randint_buff + ((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN);
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint_buff + ((cursor % g_args.prepared_rand) * INT_BUFF_LEN);
}
static int32_t rand_int()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randint[cursor % MAX_PREPARED_RAND];
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint[cursor % g_args.prepared_rand];
}
static char *rand_uint_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randuint_buff + ((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN);
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randuint_buff + ((cursor % g_args.prepared_rand) * INT_BUFF_LEN);
}
static int32_t rand_uint()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randuint[cursor % MAX_PREPARED_RAND];
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randuint[cursor % g_args.prepared_rand];
}
static char *rand_bigint_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randbigint_buff +
((cursor % MAX_PREPARED_RAND) * BIGINT_BUFF_LEN);
((cursor % g_args.prepared_rand) * BIGINT_BUFF_LEN);
}
static int64_t rand_bigint()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randbigint[cursor % MAX_PREPARED_RAND];
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randbigint[cursor % g_args.prepared_rand];
}
static char *rand_ubigint_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randubigint_buff +
((cursor % MAX_PREPARED_RAND) * BIGINT_BUFF_LEN);
((cursor % g_args.prepared_rand) * BIGINT_BUFF_LEN);
}
static int64_t rand_ubigint()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randubigint[cursor % MAX_PREPARED_RAND];
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randubigint[cursor % g_args.prepared_rand];
}
static char *rand_float_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randfloat_buff + ((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN);
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randfloat_buff + ((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN);
}
......@@ -2379,58 +2384,58 @@ static float rand_float()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randfloat[cursor % MAX_PREPARED_RAND];
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randfloat[cursor % g_args.prepared_rand];
}
static char *demo_current_float_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_rand_current_buff +
((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN);
((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN);
}
static float UNUSED_FUNC demo_current_float()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return (float)(9.8 + 0.04 * (g_randint[cursor % MAX_PREPARED_RAND] % 10)
+ g_randfloat[cursor % MAX_PREPARED_RAND]/1000000000);
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return (float)(9.8 + 0.04 * (g_randint[cursor % g_args.prepared_rand] % 10)
+ g_randfloat[cursor % g_args.prepared_rand]/1000000000);
}
static char *demo_voltage_int_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_rand_voltage_buff +
((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN);
((cursor % g_args.prepared_rand) * INT_BUFF_LEN);
}
static int32_t UNUSED_FUNC demo_voltage_int()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return 215 + g_randint[cursor % MAX_PREPARED_RAND] % 10;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return 215 + g_randint[cursor % g_args.prepared_rand] % 10;
}
static char *demo_phase_float_str() {
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_rand_phase_buff + ((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN);
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_rand_phase_buff + ((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN);
}
static float UNUSED_FUNC demo_phase_float() {
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return (float)((115 + g_randint[cursor % MAX_PREPARED_RAND] % 10
+ g_randfloat[cursor % MAX_PREPARED_RAND]/1000000000)/360);
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return (float)((115 + g_randint[cursor % g_args.prepared_rand] % 10
+ g_randfloat[cursor % g_args.prepared_rand]/1000000000)/360);
}
#if 0
......@@ -2469,7 +2474,7 @@ static char *rand_double_str()
{
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randdouble_buff + (cursor * DOUBLE_BUFF_LEN);
}
......@@ -2477,42 +2482,54 @@ static double rand_double()
{
static int cursor;
cursor++;
cursor = cursor % MAX_PREPARED_RAND;
cursor = cursor % g_args.prepared_rand;
return g_randdouble[cursor];
}
static void init_rand_data() {
g_randint_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND);
g_randint_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand);
assert(g_randint_buff);
g_rand_voltage_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND);
g_rand_voltage_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand);
assert(g_rand_voltage_buff);
g_randbigint_buff = calloc(1, BIGINT_BUFF_LEN * MAX_PREPARED_RAND);
g_randbigint_buff = calloc(1, BIGINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randbigint_buff);
g_randsmallint_buff = calloc(1, SMALLINT_BUFF_LEN * MAX_PREPARED_RAND);
g_randsmallint_buff = calloc(1, SMALLINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randsmallint_buff);
g_randtinyint_buff = calloc(1, TINYINT_BUFF_LEN * MAX_PREPARED_RAND);
g_randtinyint_buff = calloc(1, TINYINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randtinyint_buff);
g_randbool_buff = calloc(1, BOOL_BUFF_LEN * MAX_PREPARED_RAND);
g_randbool_buff = calloc(1, BOOL_BUFF_LEN * g_args.prepared_rand);
assert(g_randbool_buff);
g_randfloat_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND);
g_randfloat_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand);
assert(g_randfloat_buff);
g_rand_current_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND);
g_rand_current_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand);
assert(g_rand_current_buff);
g_rand_phase_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND);
g_rand_phase_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand);
assert(g_rand_phase_buff);
g_randdouble_buff = calloc(1, DOUBLE_BUFF_LEN * MAX_PREPARED_RAND);
g_randdouble_buff = calloc(1, DOUBLE_BUFF_LEN * g_args.prepared_rand);
assert(g_randdouble_buff);
g_randuint_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND);
g_randuint_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand);
assert(g_randuint_buff);
g_randutinyint_buff = calloc(1, TINYINT_BUFF_LEN * MAX_PREPARED_RAND);
g_randutinyint_buff = calloc(1, TINYINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randutinyint_buff);
g_randusmallint_buff = calloc(1, SMALLINT_BUFF_LEN * MAX_PREPARED_RAND);
g_randusmallint_buff = calloc(1, SMALLINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randusmallint_buff);
g_randubigint_buff = calloc(1, BIGINT_BUFF_LEN * MAX_PREPARED_RAND);
g_randubigint_buff = calloc(1, BIGINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randubigint_buff);
for (int i = 0; i < MAX_PREPARED_RAND; i++) {
g_randint = calloc(1, sizeof(int32_t) * g_args.prepared_rand);
assert(g_randint);
g_randuint = calloc(1, sizeof(uint32_t) * g_args.prepared_rand);
assert(g_randuint);
g_randbigint = calloc(1, sizeof(int64_t) * g_args.prepared_rand);
assert(g_randbigint);
g_randubigint = calloc(1, sizeof(uint64_t) * g_args.prepared_rand);
assert(g_randubigint);
g_randfloat = calloc(1, sizeof(float) * g_args.prepared_rand);
assert(g_randfloat);
g_randdouble = calloc(1, sizeof(double) * g_args.prepared_rand);
assert(g_randdouble);
for (int i = 0; i < g_args.prepared_rand; i++) {
g_randint[i] = (int)(taosRandom() % RAND_MAX - (RAND_MAX >> 1));
g_randuint[i] = (int)(taosRandom());
sprintf(g_randint_buff + i * INT_BUFF_LEN, "%d",
......@@ -2755,6 +2772,8 @@ static int printfInsertMeta() {
g_Dbs.db[i].superTbls[j].sampleFormat);
printf(" sampleFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sampleFile);
printf(" useSampleTs: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].useSampleTs ? "yes (warning: disorderRange/disorderRatio is disabled)" : "no");
printf(" tagsFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].tagsFile);
printf(" columnCount: \033[33m%d\033[0m\n ",
......@@ -2799,8 +2818,6 @@ static int printfInsertMeta() {
printf(" insertRows: \033[33m%"PRId64"\033[0m\n",
g_args.insertRows);
}
printf("\n");
}
......@@ -3384,7 +3401,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
free(dbInfos);
}
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port,
static int postProceSql(char *host, uint16_t port,
char* sqlstr, threadInfo *pThreadInfo)
{
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
......@@ -3416,35 +3433,18 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '+', '/'};
if (g_args.test_mode == INSERT_TEST) {
snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s",
g_Dbs.user, g_Dbs.password);
} else {
snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s",
g_queryInfo.user, g_queryInfo.password);
}
size_t userpass_buf_len = strlen(userpass_buf);
size_t encoded_len = 4 * ((userpass_buf_len +2) / 3);
char base64_buf[INPUT_BUF_LEN];
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
free(request_buf);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
free(request_buf);
ERROR_EXIT("connecting");
}
memset(base64_buf, 0, INPUT_BUF_LEN);
......@@ -3484,9 +3484,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
sent = 0;
do {
#ifdef WINDOWS
bytes = send(sockfd, request_buf + sent, req_str_len - sent, 0);
bytes = send(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent, 0);
#else
bytes = write(sockfd, request_buf + sent, req_str_len - sent);
bytes = write(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent);
#endif
if (bytes < 0)
ERROR_EXIT("writing message to socket");
......@@ -3505,9 +3505,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
do {
#ifdef WINDOWS
bytes = recv(sockfd, response_buf + received, resp_len - received, 0);
bytes = recv(pThreadInfo->sockfds, response_buf + received, resp_len - received, 0);
#else
bytes = read(sockfd, response_buf + received, resp_len - received);
bytes = read(pThreadInfo->sockfd, response_buf + received, resp_len - received);
#endif
verbosePrint("%s() LN%d: bytes:%d\n", __func__, __LINE__, bytes);
if (bytes < 0) {
......@@ -3518,12 +3518,11 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
break;
received += bytes;
response_buf[RESP_BUF_LEN - 1] = '\0';
if (strlen(response_buf)) {
verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf);
response_buf[RESP_BUF_LEN - 1] = '\0';
if (strlen(response_buf)) {
if (((NULL == strstr(response_buf, resEncodingChunk))
&& (NULL != strstr(response_buf, resHttp)))
|| ((NULL != strstr(response_buf, resHttpOk))
......@@ -3546,14 +3545,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
}
free(request_buf);
#ifdef WINDOWS
closesocket(sockfd);
WSACleanup();
#else
close(sockfd);
#endif
if (NULL == strstr(response_buf, "\"status\":\"succ\"")) {
response_buf[RESP_BUF_LEN - 1] = '\0';
if (NULL == strstr(response_buf, resHttpOk)) {
errorPrint("%s() LN%d, Response:\n%s\n",
__func__, __LINE__, response_buf);
return -1;
......@@ -3815,8 +3809,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
limit, offset);
//get all child table name use cmd: select tbname from superTblName;
snprintf(command, 1024, "select tbname from %s.%s %s",
dbName, stbName, limitBuf);
snprintf(command, 1024, "select tbname from %s.%s %s", dbName, stbName, limitBuf);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
......@@ -4344,6 +4337,7 @@ static int createSuperTable(
superTbl->lenOfTagOfOneRow = lenOfTagOfOneRow;
snprintf(command, BUFFER_SIZE,
"CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP%s) TAGS %s",
dbName, superTbl->stbName, cols, tags);
......@@ -4592,7 +4586,6 @@ static void* createTable(void *sarg)
return NULL;
}
pThreadInfo->tables_created += batchNum;
uint64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
......@@ -4606,8 +4599,8 @@ static void* createTable(void *sarg)
NO_INSERT_TYPE, false)) {
errorPrint2("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer);
}
pThreadInfo->tables_created += batchNum;
}
free(pThreadInfo->buffer);
return NULL;
}
......@@ -4814,6 +4807,23 @@ static int readTagFromCsvFileToMem(SSuperTable * stbInfo) {
return 0;
}
static void getAndSetRowsFromCsvFile(SSuperTable *stbInfo) {
FILE *fp = fopen(stbInfo->sampleFile, "r");
int line_count = 0;
if (fp == NULL) {
errorPrint("Failed to open sample file: %s, reason:%s\n",
stbInfo->sampleFile, strerror(errno));
exit(EXIT_FAILURE);
}
char *buf = calloc(1, stbInfo->maxSqlLen);
while (fgets(buf, stbInfo->maxSqlLen, fp)) {
line_count++;
}
fclose(fp);
tmfree(buf);
stbInfo->insertRows = line_count;
}
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
......@@ -5267,6 +5277,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON* prepareRand = cJSON_GetObjectItem(root, "prepared_rand");
if (prepareRand && prepareRand->type == cJSON_Number) {
if (prepareRand->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, prepared_rand input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.prepared_rand = prepareRand->valueint;
} else if (!prepareRand) {
g_args.prepared_rand = 10000;
} else {
errorPrint("%s() LN%d, failed to read json, prepared_rand not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt
&& answerPrompt->type == cJSON_String
......@@ -5308,7 +5334,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
MAX_DB_COUNT);
goto PARSE_OVER;
}
g_Dbs.db = calloc(1, sizeof(SDataBase)*dbSize);
assert(g_Dbs.db);
g_Dbs.dbCount = dbSize;
for (int i = 0; i < dbSize; ++i) {
cJSON* dbinfos = cJSON_GetArrayItem(dbs, i);
......@@ -5508,7 +5535,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls = calloc(1, stbSize * sizeof(SSuperTable));
assert(g_Dbs.db[i].superTbls);
g_Dbs.db[i].superTblCount = stbSize;
for (int j = 0; j < stbSize; ++j) {
cJSON* stbInfo = cJSON_GetArrayItem(stables, j);
......@@ -5707,6 +5735,23 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON *useSampleTs = cJSON_GetObjectItem(stbInfo, "use_sample_ts");
if (useSampleTs && useSampleTs->type == cJSON_String
&& useSampleTs->valuestring != NULL) {
if (0 == strncasecmp(useSampleTs->valuestring, "yes", 3)) {
g_Dbs.db[i].superTbls[j].useSampleTs = true;
} else if (0 == strncasecmp(useSampleTs->valuestring, "no", 2)){
g_Dbs.db[i].superTbls[j].useSampleTs = false;
} else {
g_Dbs.db[i].superTbls[j].useSampleTs = false;
}
} else if (!useSampleTs) {
g_Dbs.db[i].superTbls[j].useSampleTs = false;
} else {
errorPrint("%s", "failed to read json, use_sample_ts not found\n");
goto PARSE_OVER;
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
if ((tagsFile && tagsFile->type == cJSON_String)
&& (tagsFile->valuestring != NULL)) {
......@@ -6364,9 +6409,12 @@ static bool getInfoFromJsonFile(char* file) {
}
if (INSERT_TEST == g_args.test_mode) {
memset(&g_Dbs, 0, sizeof(SDbs));
g_Dbs.use_metric = g_args.use_metric;
ret = getMetaFromInsertJsonFile(root);
} else if ((QUERY_TEST == g_args.test_mode)
|| (SUBSCRIBE_TEST == g_args.test_mode)) {
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
ret = getMetaFromQueryJsonFile(root);
} else {
errorPrint("%s",
......@@ -6431,8 +6479,9 @@ static void postFreeResource() {
g_Dbs.db[i].superTbls[j].childTblName = NULL;
}
}
tmfree(g_Dbs.db[i].superTbls);
}
tmfree(g_Dbs.db);
tmfree(g_randbool_buff);
tmfree(g_randint_buff);
tmfree(g_rand_voltage_buff);
......@@ -6455,6 +6504,7 @@ static void postFreeResource() {
}
}
tmfree(g_sampleBindBatchArray);
#endif
}
......@@ -6467,13 +6517,20 @@ static int getRowDataFromSample(
}
int dataLen = 0;
if(stbInfo->useSampleTs) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"(%s",
stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * (*sampleUsePos));
} else {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%s",
stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * (*sampleUsePos));
}
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++;
......@@ -6906,6 +6963,9 @@ static int prepareSampleForStb(SSuperTable *stbInfo) {
int ret;
if (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample"))) {
if(stbInfo->useSampleTs) {
getAndSetRowsFromCsvFile(stbInfo);
}
ret = generateSampleFromCsvForStb(stbInfo);
} else {
ret = generateSampleFromRandForStb(stbInfo);
......@@ -6956,7 +7016,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
if (0 != postProceSql(g_Dbs.host, g_Dbs.port,
pThreadInfo->buffer, pThreadInfo)) {
affectedRows = -1;
printf("========restful return fail, threadID[%d]\n",
......@@ -7007,12 +7067,11 @@ static void getTableName(char *pTblName,
stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
stbInfo->childTblPrefix, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN,
"%s%"PRIu64"", stbInfo->childTblPrefix, tableSeq);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
g_args.tb_prefix, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", g_args.tb_prefix, tableSeq);
}
}
......@@ -10495,6 +10554,33 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_Dbs.serv_addr), sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
......@@ -10532,6 +10618,14 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tmfree((char *)pThreadInfo->bind_ts_array);
tmfree(pThreadInfo->bindParams);
tmfree(pThreadInfo->is_null);
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
#else
if (pThreadInfo->sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES; k++) {
......@@ -11194,6 +11288,31 @@ static int queryTestProcess() {
}
}
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr),
sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
pThreadInfo->taos = NULL;// workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedTableQuery,
......@@ -11245,6 +11364,31 @@ static int queryTestProcess() {
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // workaround to use separate taos connection;
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr),
sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
......@@ -11257,6 +11401,15 @@ static int queryTestProcess() {
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infos + i * nSqlCount + j;
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
}
}
}
......@@ -11266,6 +11419,15 @@ static int queryTestProcess() {
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infosOfSub + i;
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
}
tmfree((char*)pidsOfSub);
......@@ -11768,29 +11930,6 @@ static int subscribeTestProcess() {
return 0;
}
static void initOfInsertMeta() {
memset(&g_Dbs, 0, sizeof(SDbs));
// set default values
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
g_Dbs.port = 6030;
tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE);
tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, SHELL_MAX_PASSWORD_LEN);
g_Dbs.threadCount = 2;
g_Dbs.use_metric = g_args.use_metric;
}
static void initOfQueryMeta() {
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
// set default values
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
g_queryInfo.port = 6030;
tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE);
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, SHELL_MAX_PASSWORD_LEN);
}
static void setParaFromArg() {
char type[20];
char length[20];
......@@ -11823,7 +11962,7 @@ static void setParaFromArg() {
tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN);
g_Dbs.use_metric = g_args.use_metric;
g_args.prepared_rand = min(g_args.insertRows, MAX_PREPARED_RAND);
g_Dbs.aggr_func = g_args.aggr_func;
char dataString[TSDB_MAX_BYTES_PER_ROW];
......@@ -11940,7 +12079,6 @@ static int regexMatch(const char *s, const char *reg, int cflags) {
printf("Regex match failed: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
return 0;
}
......@@ -12100,8 +12238,6 @@ int main(int argc, char *argv[]) {
if (g_args.metaFile) {
g_totalChildTables = 0;
initOfInsertMeta();
initOfQueryMeta();
if (false == getInfoFromJsonFile(g_args.metaFile)) {
printf("Failed to read %s\n", g_args.metaFile);
......@@ -12111,6 +12247,10 @@ int main(int argc, char *argv[]) {
testMetaFile();
} else {
memset(&g_Dbs, 0, sizeof(SDbs));
g_Dbs.db = calloc(1, sizeof(SDataBase));
assert(g_Dbs.db);
g_Dbs.db[0].superTbls = calloc(1, sizeof(SSuperTable));
assert(g_Dbs.db[0].superTbls);
setParaFromArg();
if (NULL != g_args.sqlFile) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册