未验证 提交 dc564d2f 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-5445]<fix>: taosdemo bug for stmt interface with sample data. (#6969)

上级 4fc364a1
......@@ -16,7 +16,7 @@
/*
when in some thread query return error, thread don't exit, but return, otherwise coredump in other thread.
*/
*/
#include <stdint.h>
#include <taos.h>
......@@ -24,24 +24,24 @@
#define CURL_STATICLIB
#ifdef LINUX
#include <argp.h>
#include <inttypes.h>
#ifndef _ALPINE
#include <error.h>
#endif
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include <wordexp.h>
#include <regex.h>
#include <argp.h>
#include <inttypes.h>
#ifndef _ALPINE
#include <error.h>
#endif
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include <wordexp.h>
#include <regex.h>
#else
#include <regex.h>
#include <stdio.h>
#include <regex.h>
#include <stdio.h>
#endif
#include <assert.h>
......@@ -485,42 +485,42 @@ typedef unsigned __int32 uint32_t;
#pragma comment ( lib, "ws2_32.lib" )
// Some old MinGW/CYGWIN distributions don't define this:
#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004
#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004
#endif // ENABLE_VIRTUAL_TERMINAL_PROCESSING
static HANDLE g_stdoutHandle;
static DWORD g_consoleMode;
static void setupForAnsiEscape(void) {
DWORD mode = 0;
g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
DWORD mode = 0;
g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
if(g_stdoutHandle == INVALID_HANDLE_VALUE) {
exit(GetLastError());
}
if(g_stdoutHandle == INVALID_HANDLE_VALUE) {
exit(GetLastError());
}
if(!GetConsoleMode(g_stdoutHandle, &mode)) {
exit(GetLastError());
}
if(!GetConsoleMode(g_stdoutHandle, &mode)) {
exit(GetLastError());
}
g_consoleMode = mode;
g_consoleMode = mode;
// Enable ANSI escape codes
mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
// Enable ANSI escape codes
mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
if(!SetConsoleMode(g_stdoutHandle, mode)) {
exit(GetLastError());
}
if(!SetConsoleMode(g_stdoutHandle, mode)) {
exit(GetLastError());
}
}
static void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
// Reset colors
printf("\x1b[0m");
// Reset console mode
if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) {
exit(GetLastError());
}
// Reset console mode
if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) {
exit(GetLastError());
}
}
static int taosRandom()
......@@ -534,15 +534,15 @@ static int taosRandom()
static void setupForAnsiEscape(void) {}
static void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
// Reset colors
printf("\x1b[0m");
}
#include <time.h>
static int taosRandom()
{
return rand();
return rand();
}
#endif // ifdef Windows
......@@ -634,7 +634,7 @@ static FILE * g_fpOfInsertResult = NULL;
#define debugPrint(fmt, ...) \
do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) \
......@@ -1363,14 +1363,14 @@ static const char charNum[] = "0123456789";
static void nonrand_string(char *, int) __attribute__ ((unused)); // reserve for debugging purpose
static void nonrand_string(char *str, int size)
{
str[0] = 0;
if (size > 0) {
int n;
for (n = 0; n < size; n++) {
str[n] = charNum[n % 10];
}
str[n] = 0;
}
str[0] = 0;
if (size > 0) {
int n;
for (n = 0; n < size; n++) {
str[n] = charNum[n % 10];
}
str[n] = 0;
}
}
#endif
......@@ -1436,8 +1436,8 @@ static int printfInsertMeta() {
if (g_args.iface != INTERFACE_BUT) {
// first time if no iface specified
printf("interface: \033[33m%s\033[0m\n",
(g_args.iface==TAOSC_IFACE)?"taosc":
(g_args.iface==REST_IFACE)?"rest":"stmt");
(g_args.iface==TAOSC_IFACE)?"taosc":
(g_args.iface==REST_IFACE)?"rest":"stmt");
}
printf("host: \033[33m%s:%u\033[0m\n",
......@@ -2221,24 +2221,24 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
request_buf = malloc(req_buf_len);
if (NULL == request_buf) {
errorPrint("%s", "ERROR, cannot allocate memory.\n");
exit(EXIT_FAILURE);
errorPrint("%s", "ERROR, cannot allocate memory.\n");
exit(EXIT_FAILURE);
}
char userpass_buf[INPUT_BUF_LEN];
int mod_table[] = {0, 2, 1};
static char base64[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '+', '/'};
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '+', '/'};
snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s",
g_Dbs.user, g_Dbs.password);
g_Dbs.user, g_Dbs.password);
size_t userpass_buf_len = strlen(userpass_buf);
size_t encoded_len = 4 * ((userpass_buf_len +2) / 3);
......@@ -2270,22 +2270,22 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
memset(base64_buf, 0, INPUT_BUF_LEN);
for (int n = 0, m = 0; n < userpass_buf_len;) {
uint32_t oct_a = n < userpass_buf_len ?
(unsigned char) userpass_buf[n++]:0;
uint32_t oct_b = n < userpass_buf_len ?
(unsigned char) userpass_buf[n++]:0;
uint32_t oct_c = n < userpass_buf_len ?
(unsigned char) userpass_buf[n++]:0;
uint32_t triple = (oct_a << 0x10) + (oct_b << 0x08) + oct_c;
uint32_t oct_a = n < userpass_buf_len ?
(unsigned char) userpass_buf[n++]:0;
uint32_t oct_b = n < userpass_buf_len ?
(unsigned char) userpass_buf[n++]:0;
uint32_t oct_c = n < userpass_buf_len ?
(unsigned char) userpass_buf[n++]:0;
uint32_t triple = (oct_a << 0x10) + (oct_b << 0x08) + oct_c;
base64_buf[m++] = base64[(triple >> 3* 6) & 0x3f];
base64_buf[m++] = base64[(triple >> 2* 6) & 0x3f];
base64_buf[m++] = base64[(triple >> 1* 6) & 0x3f];
base64_buf[m++] = base64[(triple >> 0* 6) & 0x3f];
base64_buf[m++] = base64[(triple >> 3* 6) & 0x3f];
base64_buf[m++] = base64[(triple >> 2* 6) & 0x3f];
base64_buf[m++] = base64[(triple >> 1* 6) & 0x3f];
base64_buf[m++] = base64[(triple >> 0* 6) & 0x3f];
}
for (int l = 0; l < mod_table[userpass_buf_len % 3]; l++)
base64_buf[encoded_len - 1 - l] = '=';
base64_buf[encoded_len - 1 - l] = '=';
debugPrint("%s() LN%d: auth string base64 encoded: %s\n",
__func__, __LINE__, base64_buf);
......@@ -2343,7 +2343,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
printf("Response:\n%s\n", response_buf);
if (strlen(pThreadInfo->filePath) > 0) {
appendResultBufToFile(response_buf, pThreadInfo);
appendResultBufToFile(response_buf, pThreadInfo);
}
free(request_buf);
......@@ -2425,11 +2425,11 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int64_t tableSeq) {
if ((g_args.demo_mode) && (i == 0)) {
dataLen += snprintf(dataBuf + dataLen,
TSDB_MAX_SQL_LEN - dataLen,
"%"PRId64",", tableSeq % 10);
"%"PRId64",", tableSeq % 10);
} else {
dataLen += snprintf(dataBuf + dataLen,
TSDB_MAX_SQL_LEN - dataLen,
"%"PRId64",", tableSeq);
"%"PRId64",", tableSeq);
}
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"bigint", strlen("bigint"))) {
......@@ -2472,72 +2472,72 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int64_t tableSeq) {
}
static int calcRowLen(SSuperTable* superTbls) {
int colIndex;
int lenOfOneRow = 0;
for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) {
char* dataType = superTbls->columns[colIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
lenOfOneRow += 11;
} else if (strcasecmp(dataType, "BIGINT") == 0) {
lenOfOneRow += 21;
} else if (strcasecmp(dataType, "SMALLINT") == 0) {
lenOfOneRow += 6;
} else if (strcasecmp(dataType, "TINYINT") == 0) {
lenOfOneRow += 4;
} else if (strcasecmp(dataType, "BOOL") == 0) {
lenOfOneRow += 6;
} else if (strcasecmp(dataType, "FLOAT") == 0) {
lenOfOneRow += 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
lenOfOneRow += 42;
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
lenOfOneRow += 21;
} else {
printf("get error data type : %s\n", dataType);
exit(-1);
}
}
superTbls->lenOfOneRow = lenOfOneRow + 20; // timestamp
int tagIndex;
int lenOfTagOfOneRow = 0;
for (tagIndex = 0; tagIndex < superTbls->tagCount; tagIndex++) {
char* dataType = superTbls->tags[tagIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 11;
} else if (strcasecmp(dataType, "BIGINT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 21;
} else if (strcasecmp(dataType, "SMALLINT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6;
} else if (strcasecmp(dataType, "TINYINT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 4;
} else if (strcasecmp(dataType, "BOOL") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6;
} else if (strcasecmp(dataType, "FLOAT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42;
} else {
printf("get error tag type : %s\n", dataType);
exit(-1);
int colIndex;
int lenOfOneRow = 0;
for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) {
char* dataType = superTbls->columns[colIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
lenOfOneRow += 11;
} else if (strcasecmp(dataType, "BIGINT") == 0) {
lenOfOneRow += 21;
} else if (strcasecmp(dataType, "SMALLINT") == 0) {
lenOfOneRow += 6;
} else if (strcasecmp(dataType, "TINYINT") == 0) {
lenOfOneRow += 4;
} else if (strcasecmp(dataType, "BOOL") == 0) {
lenOfOneRow += 6;
} else if (strcasecmp(dataType, "FLOAT") == 0) {
lenOfOneRow += 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
lenOfOneRow += 42;
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
lenOfOneRow += 21;
} else {
printf("get error data type : %s\n", dataType);
exit(-1);
}
}
superTbls->lenOfOneRow = lenOfOneRow + 20; // timestamp
int tagIndex;
int lenOfTagOfOneRow = 0;
for (tagIndex = 0; tagIndex < superTbls->tagCount; tagIndex++) {
char* dataType = superTbls->tags[tagIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 11;
} else if (strcasecmp(dataType, "BIGINT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 21;
} else if (strcasecmp(dataType, "SMALLINT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6;
} else if (strcasecmp(dataType, "TINYINT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 4;
} else if (strcasecmp(dataType, "BOOL") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6;
} else if (strcasecmp(dataType, "FLOAT") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42;
} else {
printf("get error tag type : %s\n", dataType);
exit(-1);
}
}
}
superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow;
superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow;
return 0;
return 0;
}
......@@ -2545,84 +2545,84 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
char* dbName, char* sTblName, char** childTblNameOfSuperTbl,
int64_t* childTblCountOfSuperTbl, int64_t limit, uint64_t offset) {
char command[BUFFER_SIZE] = "\0";
char limitBuf[100] = "\0";
char command[BUFFER_SIZE] = "\0";
char limitBuf[100] = "\0";
TAOS_RES * res;
TAOS_ROW row = NULL;
TAOS_RES * res;
TAOS_ROW row = NULL;
char* childTblName = *childTblNameOfSuperTbl;
char* childTblName = *childTblNameOfSuperTbl;
if (offset >= 0) {
snprintf(limitBuf, 100, " limit %"PRId64" offset %"PRIu64"",
limit, offset);
}
if (offset >= 0) {
snprintf(limitBuf, 100, " limit %"PRId64" offset %"PRIu64"",
limit, offset);
}
//get all child table name use cmd: select tbname from superTblName;
snprintf(command, BUFFER_SIZE, "select tbname from %s.%s %s",
dbName, sTblName, limitBuf);
//get all child table name use cmd: select tbname from superTblName;
snprintf(command, BUFFER_SIZE, "select tbname from %s.%s %s",
dbName, sTblName, limitBuf);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
taos_free_result(res);
taos_close(taos);
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, command);
exit(-1);
}
int64_t childTblCount = (limit < 0)?10000:limit;
int64_t count = 0;
if (childTblName == NULL) {
childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
if (NULL == childTblName) {
taos_free_result(res);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
taos_free_result(res);
taos_close(taos);
errorPrint("%s() LN%d, failed to allocate memory!\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, command);
exit(-1);
}
}
char* pTblName = childTblName;
while((row = taos_fetch_row(res)) != NULL) {
int32_t* len = taos_fetch_lengths(res);
if (0 == strlen((char *)row[0])) {
errorPrint("%s() LN%d, No.%"PRId64" table return empty name\n",
__func__, __LINE__, count);
exit(-1);
int64_t childTblCount = (limit < 0)?10000:limit;
int64_t count = 0;
if (childTblName == NULL) {
childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
if (NULL == childTblName) {
taos_free_result(res);
taos_close(taos);
errorPrint("%s() LN%d, failed to allocate memory!\n", __func__, __LINE__);
exit(-1);
}
}
tstrncpy(pTblName, (char *)row[0], len[0]+1);
//printf("==== sub table name: %s\n", pTblName);
count++;
if (count >= childTblCount - 1) {
char *tmp = realloc(childTblName,
(size_t)childTblCount*1.5*TSDB_TABLE_NAME_LEN+1);
if (tmp != NULL) {
childTblName = tmp;
childTblCount = (int)(childTblCount*1.5);
memset(childTblName + count*TSDB_TABLE_NAME_LEN, 0,
(size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN));
} else {
// exit, if allocate more memory failed
errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n",
__func__, __LINE__, dbName, sTblName);
tmfree(childTblName);
taos_free_result(res);
taos_close(taos);
exit(-1);
}
char* pTblName = childTblName;
while((row = taos_fetch_row(res)) != NULL) {
int32_t* len = taos_fetch_lengths(res);
if (0 == strlen((char *)row[0])) {
errorPrint("%s() LN%d, No.%"PRId64" table return empty name\n",
__func__, __LINE__, count);
exit(-1);
}
tstrncpy(pTblName, (char *)row[0], len[0]+1);
//printf("==== sub table name: %s\n", pTblName);
count++;
if (count >= childTblCount - 1) {
char *tmp = realloc(childTblName,
(size_t)childTblCount*1.5*TSDB_TABLE_NAME_LEN+1);
if (tmp != NULL) {
childTblName = tmp;
childTblCount = (int)(childTblCount*1.5);
memset(childTblName + count*TSDB_TABLE_NAME_LEN, 0,
(size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN));
} else {
// exit, if allocate more memory failed
errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n",
__func__, __LINE__, dbName, sTblName);
tmfree(childTblName);
taos_free_result(res);
taos_close(taos);
exit(-1);
}
}
pTblName = childTblName + count * TSDB_TABLE_NAME_LEN;
}
pTblName = childTblName + count * TSDB_TABLE_NAME_LEN;
}
*childTblCountOfSuperTbl = count;
*childTblNameOfSuperTbl = childTblName;
*childTblCountOfSuperTbl = count;
*childTblNameOfSuperTbl = childTblName;
taos_free_result(res);
return 0;
taos_free_result(res);
return 0;
}
static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName,
......@@ -2637,82 +2637,82 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName,
static int getSuperTableFromServer(TAOS * taos, char* dbName,
SSuperTable* superTbls) {
char command[BUFFER_SIZE] = "\0";
TAOS_RES * res;
TAOS_ROW row = NULL;
int count = 0;
//get schema use cmd: describe superTblName;
snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
printf("failed to run command %s\n", command);
char command[BUFFER_SIZE] = "\0";
TAOS_RES * res;
TAOS_ROW row = NULL;
int count = 0;
//get schema use cmd: describe superTblName;
snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
printf("failed to run command %s\n", command);
taos_free_result(res);
return -1;
}
int tagIndex = 0;
int columnIndex = 0;
TAOS_FIELD *fields = taos_fetch_fields(res);
while((row = taos_fetch_row(res)) != NULL) {
if (0 == count) {
count++;
continue;
}
if (strcmp((char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "TAG") == 0) {
tstrncpy(superTbls->tags[tagIndex].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->tags[tagIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
superTbls->tags[tagIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->tags[tagIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
tagIndex++;
} else {
tstrncpy(superTbls->columns[columnIndex].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->columns[columnIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
superTbls->columns[columnIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->columns[columnIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
columnIndex++;
}
count++;
}
superTbls->columnCount = columnIndex;
superTbls->tagCount = tagIndex;
taos_free_result(res);
return -1;
}
int tagIndex = 0;
int columnIndex = 0;
TAOS_FIELD *fields = taos_fetch_fields(res);
while((row = taos_fetch_row(res)) != NULL) {
if (0 == count) {
count++;
continue;
}
if (strcmp((char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "TAG") == 0) {
tstrncpy(superTbls->tags[tagIndex].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->tags[tagIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
superTbls->tags[tagIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->tags[tagIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
tagIndex++;
} else {
tstrncpy(superTbls->columns[columnIndex].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->columns[columnIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
superTbls->columns[columnIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->columns[columnIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
columnIndex++;
}
count++;
}
superTbls->columnCount = columnIndex;
superTbls->tagCount = tagIndex;
taos_free_result(res);
calcRowLen(superTbls);
/*
if (TBL_ALREADY_EXISTS == superTbls->childTblExists) {
calcRowLen(superTbls);
/*
if (TBL_ALREADY_EXISTS == superTbls->childTblExists) {
//get all child table name use cmd: select tbname from superTblName;
int childTblCount = 10000;
superTbls->childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
if (superTbls->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
return -1;
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
return -1;
}
getAllChildNameOfSuperTable(taos, dbName,
superTbls->sTblName,
&superTbls->childTblName,
&superTbls->childTblCount);
}
*/
return 0;
superTbls->sTblName,
&superTbls->childTblName,
&superTbls->childTblCount);
}
*/
return 0;
}
static int createSuperTable(
......@@ -2748,8 +2748,8 @@ static int createSuperTable(
lenOfOneRow += superTbl->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
if ((g_args.demo_mode) && (colIndex == 1)) {
len += snprintf(cols + len, COL_BUFFER_LEN - len,
", VOLTAGE INT");
len += snprintf(cols + len, COL_BUFFER_LEN - len,
", VOLTAGE INT");
} else {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "INT");
}
......@@ -3221,7 +3221,7 @@ static void createChildTables() {
continue;
}
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
uint64_t startFrom = 0;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
......@@ -3269,295 +3269,295 @@ static void createChildTables() {
}
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
FILE *fp = fopen(superTblInfo->tagsFile, "r");
if (fp == NULL) {
printf("Failed to open tags file: %s, reason:%s\n",
superTblInfo->tagsFile, strerror(errno));
return -1;
}
if (superTblInfo->tagDataBuf) {
free(superTblInfo->tagDataBuf);
superTblInfo->tagDataBuf = NULL;
}
int tagCount = 10000;
int count = 0;
char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount);
if (tagDataBuf == NULL) {
printf("Failed to calloc, reason:%s\n", strerror(errno));
fclose(fp);
return -1;
}
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
while((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
FILE *fp = fopen(superTblInfo->tagsFile, "r");
if (fp == NULL) {
printf("Failed to open tags file: %s, reason:%s\n",
superTblInfo->tagsFile, strerror(errno));
return -1;
}
if (readLen == 0) {
continue;
if (superTblInfo->tagDataBuf) {
free(superTblInfo->tagDataBuf);
superTblInfo->tagDataBuf = NULL;
}
memcpy(tagDataBuf + count * superTblInfo->lenOfTagOfOneRow, line, readLen);
count++;
if (count >= tagCount - 1) {
char *tmp = realloc(tagDataBuf,
(size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow);
if (tmp != NULL) {
tagDataBuf = tmp;
tagCount = (int)(tagCount*1.5);
memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow,
0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow));
} else {
// exit, if allocate more memory failed
printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile);
tmfree(tagDataBuf);
free(line);
int tagCount = 10000;
int count = 0;
char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount);
if (tagDataBuf == NULL) {
printf("Failed to calloc, reason:%s\n", strerror(errno));
fclose(fp);
return -1;
}
}
}
superTblInfo->tagDataBuf = tagDataBuf;
superTblInfo->tagSampleCount = count;
while((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
}
if (readLen == 0) {
continue;
}
memcpy(tagDataBuf + count * superTblInfo->lenOfTagOfOneRow, line, readLen);
count++;
if (count >= tagCount - 1) {
char *tmp = realloc(tagDataBuf,
(size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow);
if (tmp != NULL) {
tagDataBuf = tmp;
tagCount = (int)(tagCount*1.5);
memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow,
0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow));
} else {
// exit, if allocate more memory failed
printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile);
tmfree(tagDataBuf);
free(line);
fclose(fp);
return -1;
}
}
}
free(line);
fclose(fp);
return 0;
superTblInfo->tagDataBuf = tagDataBuf;
superTblInfo->tagSampleCount = count;
free(line);
fclose(fp);
return 0;
}
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
static int readSampleFromCsvFileToMem(
SSuperTable* superTblInfo) {
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int getRows = 0;
FILE* fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) {
errorPrint( "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
return -1;
}
assert(superTblInfo->sampleDataBuf);
memset(superTblInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
while(1) {
readLen = tgetline(&line, &n, fp);
if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) {
errorPrint( "Failed to fseek file: %s, reason:%s\n",
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int getRows = 0;
FILE* fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) {
errorPrint( "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
fclose(fp);
return -1;
}
continue;
}
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
}
assert(superTblInfo->sampleDataBuf);
memset(superTblInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
while(1) {
readLen = tgetline(&line, &n, fp);
if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) {
errorPrint( "Failed to fseek file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
fclose(fp);
return -1;
}
continue;
}
if (readLen == 0) {
continue;
}
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
}
if (readLen > superTblInfo->lenOfOneRow) {
printf("sample row len[%d] overflow define schema len[%"PRIu64"], so discard this row\n",
(int32_t)readLen, superTblInfo->lenOfOneRow);
continue;
}
if (readLen == 0) {
continue;
}
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen);
getRows++;
if (readLen > superTblInfo->lenOfOneRow) {
printf("sample row len[%d] overflow define schema len[%"PRIu64"], so discard this row\n",
(int32_t)readLen, superTblInfo->lenOfOneRow);
continue;
}
if (getRows == MAX_SAMPLES_ONCE_FROM_FILE) {
break;
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen);
getRows++;
if (getRows == MAX_SAMPLES_ONCE_FROM_FILE) {
break;
}
}
}
fclose(fp);
tmfree(line);
return 0;
fclose(fp);
tmfree(line);
return 0;
}
static bool getColumnAndTagTypeFromInsertJsonFile(
cJSON* stbInfo, SSuperTable* superTbls) {
bool ret = false;
// columns
cJSON *columns = cJSON_GetObjectItem(stbInfo, "columns");
if (columns && columns->type != cJSON_Array) {
printf("ERROR: failed to read json, columns not found\n");
goto PARSE_OVER;
} else if (NULL == columns) {
superTbls->columnCount = 0;
superTbls->tagCount = 0;
return true;
}
bool ret = false;
// columns
cJSON *columns = cJSON_GetObjectItem(stbInfo, "columns");
if (columns && columns->type != cJSON_Array) {
printf("ERROR: failed to read json, columns not found\n");
goto PARSE_OVER;
} else if (NULL == columns) {
superTbls->columnCount = 0;
superTbls->tagCount = 0;
return true;
}
int columnSize = cJSON_GetArraySize(columns);
if ((columnSize + 1/* ts */) > TSDB_MAX_COLUMNS) {
errorPrint("%s() LN%d, failed to read json, column size overflow, max column size is %d\n",
__func__, __LINE__, TSDB_MAX_COLUMNS);
goto PARSE_OVER;
}
int columnSize = cJSON_GetArraySize(columns);
if ((columnSize + 1/* ts */) > TSDB_MAX_COLUMNS) {
errorPrint("%s() LN%d, failed to read json, column size overflow, max column size is %d\n",
__func__, __LINE__, TSDB_MAX_COLUMNS);
goto PARSE_OVER;
}
int count = 1;
int index = 0;
StrColumn columnCase;
int count = 1;
int index = 0;
StrColumn columnCase;
//superTbls->columnCount = columnSize;
for (int k = 0; k < columnSize; ++k) {
cJSON* column = cJSON_GetArrayItem(columns, k);
if (column == NULL) continue;
//superTbls->columnCount = columnSize;
for (int k = 0; k < columnSize; ++k) {
cJSON* column = cJSON_GetArrayItem(columns, k);
if (column == NULL) continue;
count = 1;
cJSON* countObj = cJSON_GetObjectItem(column, "count");
if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) {
errorPrint("%s() LN%d, failed to read json, column count not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
count = 1;
}
// column info
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String
|| dataType->valuestring == NULL) {
errorPrint("%s() LN%d: failed to read json, column type not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
tstrncpy(columnCase.dataType, dataType->valuestring, strlen(dataType->valuestring) + 1);
cJSON* dataLen = cJSON_GetObjectItem(column, "len");
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
debugPrint("%s() LN%d: failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
columnCase.dataLen = 8;
}
for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->columns[index].dataType,
columnCase.dataType, strlen(columnCase.dataType) + 1);
superTbls->columns[index].dataLen = columnCase.dataLen;
index++;
}
}
if ((index + 1 /* ts */) > MAX_NUM_COLUMNS) {
errorPrint("%s() LN%d, failed to read json, column size overflow, allowed max column size is %d\n",
__func__, __LINE__, MAX_NUM_COLUMNS);
goto PARSE_OVER;
}
superTbls->columnCount = index;
count = 1;
index = 0;
// tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, tags not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
int tagSize = cJSON_GetArraySize(tags);
if (tagSize > TSDB_MAX_TAGS) {
errorPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n",
__func__, __LINE__, TSDB_MAX_TAGS);
goto PARSE_OVER;
}
//superTbls->tagCount = tagSize;
for (int k = 0; k < tagSize; ++k) {
cJSON* tag = cJSON_GetArrayItem(tags, k);
if (tag == NULL) continue;
count = 1;
cJSON* countObj = cJSON_GetObjectItem(column, "count");
if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) {
errorPrint("%s() LN%d, failed to read json, column count not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
count = 1;
}
// column info
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String
|| dataType->valuestring == NULL) {
errorPrint("%s() LN%d: failed to read json, column type not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
tstrncpy(columnCase.dataType, dataType->valuestring, strlen(dataType->valuestring) + 1);
cJSON* dataLen = cJSON_GetObjectItem(column, "len");
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
debugPrint("%s() LN%d: failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
columnCase.dataLen = 8;
}
for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->columns[index].dataType,
columnCase.dataType, strlen(columnCase.dataType) + 1);
superTbls->columns[index].dataLen = columnCase.dataLen;
index++;
}
}
if ((index + 1 /* ts */) > MAX_NUM_COLUMNS) {
errorPrint("%s() LN%d, failed to read json, column size overflow, allowed max column size is %d\n",
__func__, __LINE__, MAX_NUM_COLUMNS);
goto PARSE_OVER;
}
superTbls->columnCount = index;
count = 1;
cJSON* countObj = cJSON_GetObjectItem(tag, "count");
if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) {
printf("ERROR: failed to read json, column count not found\n");
goto PARSE_OVER;
} else {
count = 1;
}
// column info
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(tag, "type");
if (!dataType || dataType->type != cJSON_String
|| dataType->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, tag type not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(columnCase.dataType, dataType->valuestring, strlen(dataType->valuestring) + 1);
cJSON* dataLen = cJSON_GetObjectItem(tag, "len");
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
errorPrint("%s() LN%d, failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
columnCase.dataLen = 0;
index = 0;
// tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, tags not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
int tagSize = cJSON_GetArraySize(tags);
if (tagSize > TSDB_MAX_TAGS) {
errorPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n",
__func__, __LINE__, TSDB_MAX_TAGS);
goto PARSE_OVER;
}
for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType,
strlen(columnCase.dataType) + 1);
superTbls->tags[index].dataLen = columnCase.dataLen;
index++;
//superTbls->tagCount = tagSize;
for (int k = 0; k < tagSize; ++k) {
cJSON* tag = cJSON_GetArrayItem(tags, k);
if (tag == NULL) continue;
count = 1;
cJSON* countObj = cJSON_GetObjectItem(tag, "count");
if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) {
printf("ERROR: failed to read json, column count not found\n");
goto PARSE_OVER;
} else {
count = 1;
}
// column info
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(tag, "type");
if (!dataType || dataType->type != cJSON_String
|| dataType->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, tag type not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(columnCase.dataType, dataType->valuestring, strlen(dataType->valuestring) + 1);
cJSON* dataLen = cJSON_GetObjectItem(tag, "len");
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
errorPrint("%s() LN%d, failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
columnCase.dataLen = 0;
}
for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType,
strlen(columnCase.dataType) + 1);
superTbls->tags[index].dataLen = columnCase.dataLen;
index++;
}
}
}
if (index > TSDB_MAX_TAGS) {
errorPrint("%s() LN%d, failed to read json, tags size overflow, allowed max tag count is %d\n",
__func__, __LINE__, TSDB_MAX_TAGS);
goto PARSE_OVER;
}
if (index > TSDB_MAX_TAGS) {
errorPrint("%s() LN%d, failed to read json, tags size overflow, allowed max tag count is %d\n",
__func__, __LINE__, TSDB_MAX_TAGS);
goto PARSE_OVER;
}
superTbls->tagCount = index;
superTbls->tagCount = index;
if ((superTbls->columnCount + superTbls->tagCount + 1 /* ts */) > TSDB_MAX_COLUMNS) {
errorPrint("%s() LN%d, columns + tags is more than allowed max columns count: %d\n",
__func__, __LINE__, TSDB_MAX_COLUMNS);
goto PARSE_OVER;
}
ret = true;
if ((superTbls->columnCount + superTbls->tagCount + 1 /* ts */) > TSDB_MAX_COLUMNS) {
errorPrint("%s() LN%d, columns + tags is more than allowed max columns count: %d\n",
__func__, __LINE__, TSDB_MAX_COLUMNS);
goto PARSE_OVER;
}
ret = true;
PARSE_OVER:
return ret;
return ret;
}
static bool getMetaFromInsertJsonFile(cJSON* root) {
......@@ -4866,7 +4866,7 @@ static void postFreeResource() {
static int getRowDataFromSample(
char* dataBuf, int64_t maxLen, int64_t timestamp,
SSuperTable* superTblInfo, int64_t* sampleUsePos)
SSuperTable* superTblInfo, int64_t* sampleUsePos)
{
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
/* int ret = readSampleFromCsvFileToMem(superTblInfo);
......@@ -5118,7 +5118,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
#if STMT_IFACE_ENABLED == 1
case STMT_IFACE:
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
debugPrint("%s() LN%d, stmt=%p",
__func__, __LINE__, pThreadInfo->stmt);
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
errorPrint("%s() LN%d, failied to execute insert statement\n",
__func__, __LINE__);
......@@ -5458,39 +5459,39 @@ static int64_t generateInterlaceDataWithoutStb(
int64_t startTime,
uint64_t *pRemainderBufLen)
{
assert(buffer);
char *pstr = buffer;
assert(buffer);
char *pstr = buffer;
int headLen = generateSQLHeadWithoutStb(
tableName, dbName,
int headLen = generateSQLHeadWithoutStb(
tableName, dbName,
pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
if (headLen <= 0) {
return 0;
}
pstr += headLen;
*pRemainderBufLen -= headLen;
pstr += headLen;
*pRemainderBufLen -= headLen;
int64_t dataLen = 0;
int64_t dataLen = 0;
int32_t k = generateDataTailWithoutStb(
int32_t k = generateDataTailWithoutStb(
batch, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&dataLen);
if (k == batch) {
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
debugPrint("%s() LN%d, generated data tail: %d, not equal batch per table: %u\n",
__func__, __LINE__, k, batch);
pstr -= headLen;
pstr[0] = '\0';
k = 0;
}
return k;
if (k == batch) {
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
debugPrint("%s() LN%d, generated data tail: %d, not equal batch per table: %u\n",
__func__, __LINE__, k, batch);
pstr -= headLen;
pstr[0] = '\0';
k = 0;
}
return k;
}
#if STMT_IFACE_ENABLED == 1
......@@ -5771,6 +5772,8 @@ static int32_t prepareStbStmtBind(
TAOS_BIND *bind;
if (isColumn) {
int cursor = 0;
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
......@@ -5794,7 +5797,6 @@ static int32_t prepareStbStmtBind(
ptr += bind->buffer_length;
} else {
int cursor = 0;
if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType(
......@@ -5851,6 +5853,7 @@ static int32_t prepareStbStmtBind(
}
free(bindBuffer);
return 0;
}
......@@ -6043,27 +6046,27 @@ static int32_t generateProgressiveDataWithoutStb(
uint64_t recordFrom, int64_t startTime, /*int64_t *pSamplePos, */
int64_t *pRemainderBufLen)
{
assert(buffer != NULL);
char *pstr = buffer;
assert(buffer != NULL);
char *pstr = buffer;
memset(buffer, 0, *pRemainderBufLen);
memset(buffer, 0, *pRemainderBufLen);
int64_t headLen = generateSQLHeadWithoutStb(
tableName, pThreadInfo->db_name,
buffer, *pRemainderBufLen);
int64_t headLen = generateSQLHeadWithoutStb(
tableName, pThreadInfo->db_name,
buffer, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen;
*pRemainderBufLen -= headLen;
if (headLen <= 0) {
return 0;
}
pstr += headLen;
*pRemainderBufLen -= headLen;
int64_t dataLen;
int64_t dataLen;
return generateDataTailWithoutStb(
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom,
startTime,
/*pSamplePos, */&dataLen);
return generateDataTailWithoutStb(
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom,
startTime,
/*pSamplePos, */&dataLen);
}
static void printStatPerThread(threadInfo *pThreadInfo)
......@@ -6552,110 +6555,110 @@ static void* syncWrite(void *sarg) {
}
static void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* pThreadInfo = (threadInfo*)param;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
pThreadInfo->et = taosGetTimestampMs();
if ((pThreadInfo->et - pThreadInfo->st) < insert_interval) {
taosMsleep(insert_interval - (pThreadInfo->et - pThreadInfo->st)); // ms
}
}
char *buffer = calloc(1, pThreadInfo->superTblInfo->maxSqlLen);
char data[MAX_DATA_SIZE];
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values",
pThreadInfo->db_name, pThreadInfo->tb_prefix,
pThreadInfo->start_table_from);
// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
if (pThreadInfo->counter >= g_args.num_of_RPR) {
pThreadInfo->start_table_from++;
pThreadInfo->counter = 0;
}
if (pThreadInfo->start_table_from > pThreadInfo->end_table_to) {
tsem_post(&pThreadInfo->lock_sem);
free(buffer);
taos_free_result(res);
return;
}
for (int i = 0; i < g_args.num_of_RPR; i++) {
int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->superTblInfo->disorderRatio
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->superTblInfo, data, d);
} else {
generateStbRowData(pThreadInfo->superTblInfo,
data, pThreadInfo->lastTs += 1000);
threadInfo* pThreadInfo = (threadInfo*)param;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
pThreadInfo->et = taosGetTimestampMs();
if ((pThreadInfo->et - pThreadInfo->st) < insert_interval) {
taosMsleep(insert_interval - (pThreadInfo->et - pThreadInfo->st)); // ms
}
}
pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++;
if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
break;
char *buffer = calloc(1, pThreadInfo->superTblInfo->maxSqlLen);
char data[MAX_DATA_SIZE];
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values",
pThreadInfo->db_name, pThreadInfo->tb_prefix,
pThreadInfo->start_table_from);
// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
if (pThreadInfo->counter >= g_args.num_of_RPR) {
pThreadInfo->start_table_from++;
pThreadInfo->counter = 0;
}
if (pThreadInfo->start_table_from > pThreadInfo->end_table_to) {
tsem_post(&pThreadInfo->lock_sem);
free(buffer);
taos_free_result(res);
return;
}
}
if (insert_interval) {
pThreadInfo->st = taosGetTimestampMs();
}
taos_query_a(pThreadInfo->taos, buffer, callBack, pThreadInfo);
free(buffer);
for (int i = 0; i < g_args.num_of_RPR; i++) {
int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->superTblInfo->disorderRatio
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->superTblInfo, data, d);
} else {
generateStbRowData(pThreadInfo->superTblInfo,
data, pThreadInfo->lastTs += 1000);
}
pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++;
taos_free_result(res);
if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
break;
}
}
if (insert_interval) {
pThreadInfo->st = taosGetTimestampMs();
}
taos_query_a(pThreadInfo->taos, buffer, callBack, pThreadInfo);
free(buffer);
taos_free_result(res);
}
static void *asyncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
setThreadName("asyncWrite");
setThreadName("asyncWrite");
pThreadInfo->st = 0;
pThreadInfo->et = 0;
pThreadInfo->lastTs = pThreadInfo->start_time;
pThreadInfo->st = 0;
pThreadInfo->et = 0;
pThreadInfo->lastTs = pThreadInfo->start_time;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
pThreadInfo->st = taosGetTimestampMs();
}
taos_query_a(pThreadInfo->taos, "show databases", callBack, pThreadInfo);
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
pThreadInfo->st = taosGetTimestampMs();
}
taos_query_a(pThreadInfo->taos, "show databases", callBack, pThreadInfo);
tsem_wait(&(pThreadInfo->lock_sem));
tsem_wait(&(pThreadInfo->lock_sem));
return NULL;
return NULL;
}
static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *serv_addr)
{
uint16_t rest_port = port + TSDB_PORT_HTTP;
struct hostent *server = gethostbyname(host);
if ((server == NULL) || (server->h_addr == NULL)) {
errorPrint("%s", "ERROR, no such host");
return -1;
}
uint16_t rest_port = port + TSDB_PORT_HTTP;
struct hostent *server = gethostbyname(host);
if ((server == NULL) || (server->h_addr == NULL)) {
errorPrint("%s", "ERROR, no such host");
return -1;
}
debugPrint("h_name: %s\nh_addr=%p\nh_addretype: %s\nh_length: %d\n",
debugPrint("h_name: %s\nh_addr=%p\nh_addretype: %s\nh_length: %d\n",
server->h_name,
server->h_addr,
(server->h_addrtype == AF_INET)?"ipv4":"ipv6",
server->h_length);
memset(serv_addr, 0, sizeof(struct sockaddr_in));
serv_addr->sin_family = AF_INET;
serv_addr->sin_port = htons(rest_port);
memset(serv_addr, 0, sizeof(struct sockaddr_in));
serv_addr->sin_family = AF_INET;
serv_addr->sin_port = htons(rest_port);
#ifdef WINDOWS
serv_addr->sin_addr.s_addr = inet_addr(host);
serv_addr->sin_addr.s_addr = inet_addr(host);
#else
memcpy(&(serv_addr->sin_addr.s_addr), server->h_addr, server->h_length);
memcpy(&(serv_addr->sin_addr.s_addr), server->h_addr, server->h_length);
#endif
return 0;
return 0;
}
static void startMultiThreadInsertData(int threads, char* db_name,
......@@ -6724,14 +6727,17 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t limit;
uint64_t offset;
if ((NULL != g_args.sqlFile) && (superTblInfo->childTblExists == TBL_NO_EXISTS) &&
((superTblInfo->childTblOffset != 0) || (superTblInfo->childTblLimit >= 0))) {
if ((NULL != g_args.sqlFile)
&& (superTblInfo->childTblExists == TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset != 0)
|| (superTblInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
}
if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((superTblInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset + superTblInfo->childTblLimit)
|| ((superTblInfo->childTblOffset
+ superTblInfo->childTblLimit)
> (superTblInfo->childTblCount))) {
superTblInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset;
......@@ -6837,7 +6843,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
#if STMT_IFACE_ENABLED == 1
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) {
|| ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) {
int columnCount;
if (superTblInfo) {
......@@ -6865,7 +6872,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
== superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1); tag ++ ) {
for (int tag = 0; tag < (superTblInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
......@@ -7014,157 +7022,157 @@ static void startMultiThreadInsertData(int threads, char* db_name,
static void *readTable(void *sarg) {
#if 1
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
setThreadName("readTable");
char command[BUFFER_SIZE] = "\0";
uint64_t sTime = pThreadInfo->start_time;
char *tb_prefix = pThreadInfo->tb_prefix;
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
errorPrint( "fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
return NULL;
}
int64_t num_of_DPT;
/* if (pThreadInfo->superTblInfo) {
num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
// }
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
int n = do_aggreFunc ? (sizeof(aggreFunc) / sizeof(aggreFunc[0])) : 2;
if (!do_aggreFunc) {
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
}
printf("%"PRId64" records:\n", totalData);
fprintf(fp, "| QFunctions | QRecords | QSpeed(R/s) | QLatency(ms) |\n");
for (int j = 0; j < n; j++) {
double totalT = 0;
uint64_t count = 0;
for (int64_t i = 0; i < num_of_tables; i++) {
sprintf(command, "select %s from %s%"PRId64" where ts>= %" PRIu64,
aggreFunc[j], tb_prefix, i, sTime);
double t = taosGetTimestampMs();
TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql);
if (code != 0) {
errorPrint( "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
fclose(fp);
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
setThreadName("readTable");
char command[BUFFER_SIZE] = "\0";
uint64_t sTime = pThreadInfo->start_time;
char *tb_prefix = pThreadInfo->tb_prefix;
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
errorPrint( "fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
return NULL;
}
}
while(taos_fetch_row(pSql) != NULL) {
count++;
}
int64_t num_of_DPT;
/* if (pThreadInfo->superTblInfo) {
num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
// }
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
int n = do_aggreFunc ? (sizeof(aggreFunc) / sizeof(aggreFunc[0])) : 2;
if (!do_aggreFunc) {
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
}
printf("%"PRId64" records:\n", totalData);
fprintf(fp, "| QFunctions | QRecords | QSpeed(R/s) | QLatency(ms) |\n");
for (int j = 0; j < n; j++) {
double totalT = 0;
uint64_t count = 0;
for (int64_t i = 0; i < num_of_tables; i++) {
sprintf(command, "select %s from %s%"PRId64" where ts>= %" PRIu64,
aggreFunc[j], tb_prefix, i, sTime);
double t = taosGetTimestampMs();
TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql);
if (code != 0) {
errorPrint( "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
fclose(fp);
return NULL;
}
t = taosGetTimestampMs() - t;
totalT += t;
while(taos_fetch_row(pSql) != NULL) {
count++;
}
taos_free_result(pSql);
}
t = taosGetTimestampMs() - t;
totalT += t;
taos_free_result(pSql);
}
fprintf(fp, "|%10s | %"PRId64" | %12.2f | %10.2f |\n",
aggreFunc[j][0] == '*' ? " * " : aggreFunc[j], totalData,
(double)(num_of_tables * num_of_DPT) / totalT, totalT * 1000);
printf("select %10s took %.6f second(s)\n", aggreFunc[j], totalT * 1000);
}
fprintf(fp, "\n");
fclose(fp);
fprintf(fp, "|%10s | %"PRId64" | %12.2f | %10.2f |\n",
aggreFunc[j][0] == '*' ? " * " : aggreFunc[j], totalData,
(double)(num_of_tables * num_of_DPT) / totalT, totalT * 1000);
printf("select %10s took %.6f second(s)\n", aggreFunc[j], totalT * 1000);
}
fprintf(fp, "\n");
fclose(fp);
#endif
return NULL;
return NULL;
}
static void *readMetric(void *sarg) {
#if 1
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
setThreadName("readMetric");
char command[BUFFER_SIZE] = "\0";
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
return NULL;
}
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
setThreadName("readMetric");
char command[BUFFER_SIZE] = "\0";
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
return NULL;
}
int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
int n = do_aggreFunc ? (sizeof(aggreFunc) / sizeof(aggreFunc[0])) : 2;
if (!do_aggreFunc) {
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
}
printf("%"PRId64" records:\n", totalData);
fprintf(fp, "Querying On %"PRId64" records:\n", totalData);
int n = do_aggreFunc ? (sizeof(aggreFunc) / sizeof(aggreFunc[0])) : 2;
if (!do_aggreFunc) {
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
}
printf("%"PRId64" records:\n", totalData);
fprintf(fp, "Querying On %"PRId64" records:\n", totalData);
for (int j = 0; j < n; j++) {
char condition[COND_BUF_LEN] = "\0";
char tempS[64] = "\0";
for (int j = 0; j < n; j++) {
char condition[COND_BUF_LEN] = "\0";
char tempS[64] = "\0";
int64_t m = 10 < num_of_tables ? 10 : num_of_tables;
int64_t m = 10 < num_of_tables ? 10 : num_of_tables;
for (int64_t i = 1; i <= m; i++) {
if (i == 1) {
sprintf(tempS, "t1 = %"PRId64"", i);
} else {
sprintf(tempS, " or t1 = %"PRId64" ", i);
}
strncat(condition, tempS, COND_BUF_LEN - 1);
for (int64_t i = 1; i <= m; i++) {
if (i == 1) {
sprintf(tempS, "t1 = %"PRId64"", i);
} else {
sprintf(tempS, " or t1 = %"PRId64" ", i);
}
strncat(condition, tempS, COND_BUF_LEN - 1);
sprintf(command, "select %s from meters where %s", aggreFunc[j], condition);
sprintf(command, "select %s from meters where %s", aggreFunc[j], condition);
printf("Where condition: %s\n", condition);
fprintf(fp, "%s\n", command);
printf("Where condition: %s\n", condition);
fprintf(fp, "%s\n", command);
double t = taosGetTimestampMs();
double t = taosGetTimestampMs();
TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql);
TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql);
if (code != 0) {
errorPrint( "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
fclose(fp);
return NULL;
}
int count = 0;
while(taos_fetch_row(pSql) != NULL) {
count++;
}
t = taosGetTimestampMs() - t;
if (code != 0) {
errorPrint( "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
fclose(fp);
return NULL;
}
int count = 0;
while(taos_fetch_row(pSql) != NULL) {
count++;
}
t = taosGetTimestampMs() - t;
fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n",
num_of_tables * num_of_DPT / (t * 1000.0), t);
printf("select %10s took %.6f second(s)\n\n", aggreFunc[j], t * 1000.0);
fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n",
num_of_tables * num_of_DPT / (t * 1000.0), t);
printf("select %10s took %.6f second(s)\n\n", aggreFunc[j], t * 1000.0);
taos_free_result(pSql);
taos_free_result(pSql);
}
fprintf(fp, "\n");
}
fprintf(fp, "\n");
}
fclose(fp);
fclose(fp);
#endif
return NULL;
return NULL;
}
static void prompt()
{
if (!g_args.answer_yes) {
printf(" Press enter key to continue or Ctrl-C to stop\n\n");
(void)getchar();
}
if (!g_args.answer_yes) {
printf(" Press enter key to continue or Ctrl-C to stop\n\n");
(void)getchar();
}
}
static int insertTestProcess() {
......@@ -7264,369 +7272,369 @@ static int insertTestProcess() {
}
static void *specifiedTableQuery(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
threadInfo *pThreadInfo = (threadInfo *)sarg;
setThreadName("specTableQuery");
setThreadName("specTableQuery");
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
}
}
}
char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
errorPrint( "use database %s failed!\n\n",
char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return NULL;
}
return NULL;
}
uint64_t st = 0;
uint64_t et = 0;
uint64_t st = 0;
uint64_t et = 0;
uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
uint64_t totalQueried = 0;
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t totalQueried = 0;
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
while(queryTimes --) {
if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) <
(int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) {
taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms
}
st = taosGetTimestampMs();
while(queryTimes --) {
if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) <
(int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) {
taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms
}
st = taosGetTimestampMs();
selectAndGetResult(pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
selectAndGetResult(pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
et = taosGetTimestampMs();
printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n",
taosGetSelfPthreadId(), g_queryInfo.queryMode, (et - st)/1000.0);
et = taosGetTimestampMs();
printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n",
taosGetSelfPthreadId(), g_queryInfo.queryMode, (et - st)/1000.0);
totalQueried ++;
g_queryInfo.specifiedQueryInfo.totalQueried ++;
totalQueried ++;
g_queryInfo.specifiedQueryInfo.totalQueried ++;
uint64_t currentPrintTime = taosGetTimestampMs();
uint64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
debugPrint("%s() LN%d, endTs=%"PRIu64"ms, startTs=%"PRIu64"ms\n",
__func__, __LINE__, endTs, startTs);
printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.6f\n",
uint64_t currentPrintTime = taosGetTimestampMs();
uint64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
debugPrint("%s() LN%d, endTs=%"PRIu64"ms, startTs=%"PRIu64"ms\n",
__func__, __LINE__, endTs, startTs);
printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.6f\n",
pThreadInfo->threadID,
totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0)));
lastPrintTime = currentPrintTime;
lastPrintTime = currentPrintTime;
}
}
}
return NULL;
return NULL;
}
static void replaceChildTblName(char* inSql, char* outSql, int tblIndex) {
char sourceString[32] = "xxxx";
char subTblName[MAX_TB_NAME_SIZE*3];
sprintf(subTblName, "%s.%s",
g_queryInfo.dbName,
g_queryInfo.superQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN);
char sourceString[32] = "xxxx";
char subTblName[MAX_TB_NAME_SIZE*3];
sprintf(subTblName, "%s.%s",
g_queryInfo.dbName,
g_queryInfo.superQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN);
//printf("inSql: %s\n", inSql);
//printf("inSql: %s\n", inSql);
char* pos = strstr(inSql, sourceString);
if (0 == pos) {
return;
}
tstrncpy(outSql, inSql, pos - inSql + 1);
//printf("1: %s\n", outSql);
strncat(outSql, subTblName, MAX_QUERY_SQL_LENGTH - 1);
//printf("2: %s\n", outSql);
strncat(outSql, pos+strlen(sourceString), MAX_QUERY_SQL_LENGTH - 1);
//printf("3: %s\n", outSql);
char* pos = strstr(inSql, sourceString);
if (0 == pos) {
return;
}
tstrncpy(outSql, inSql, pos - inSql + 1);
//printf("1: %s\n", outSql);
strncat(outSql, subTblName, MAX_QUERY_SQL_LENGTH - 1);
//printf("2: %s\n", outSql);
strncat(outSql, pos+strlen(sourceString), MAX_QUERY_SQL_LENGTH - 1);
//printf("3: %s\n", outSql);
}
static void *superTableQuery(void *sarg) {
char sqlstr[MAX_QUERY_SQL_LENGTH];
threadInfo *pThreadInfo = (threadInfo *)sarg;
char sqlstr[MAX_QUERY_SQL_LENGTH];
threadInfo *pThreadInfo = (threadInfo *)sarg;
setThreadName("superTableQuery");
setThreadName("superTableQuery");
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
}
}
}
uint64_t st = 0;
uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval;
uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
uint64_t totalQueried = 0;
uint64_t startTs = taosGetTimestampMs();
uint64_t st = 0;
uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval;
uint64_t lastPrintTime = taosGetTimestampMs();
while(queryTimes --) {
if (g_queryInfo.superQueryInfo.queryInterval
&& (et - st) < (int64_t)g_queryInfo.superQueryInfo.queryInterval) {
taosMsleep(g_queryInfo.superQueryInfo.queryInterval - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
}
uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
uint64_t totalQueried = 0;
uint64_t startTs = taosGetTimestampMs();
st = taosGetTimestampMs();
for (int i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID);
}
selectAndGetResult(pThreadInfo, sqlstr);
uint64_t lastPrintTime = taosGetTimestampMs();
while(queryTimes --) {
if (g_queryInfo.superQueryInfo.queryInterval
&& (et - st) < (int64_t)g_queryInfo.superQueryInfo.queryInterval) {
taosMsleep(g_queryInfo.superQueryInfo.queryInterval - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
}
st = taosGetTimestampMs();
for (int i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID);
}
selectAndGetResult(pThreadInfo, sqlstr);
totalQueried++;
g_queryInfo.superQueryInfo.totalQueried ++;
totalQueried++;
g_queryInfo.superQueryInfo.totalQueried ++;
int64_t currentPrintTime = taosGetTimestampMs();
int64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.3f\n",
pThreadInfo->threadID,
totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0)));
lastPrintTime = currentPrintTime;
int64_t currentPrintTime = taosGetTimestampMs();
int64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.3f\n",
pThreadInfo->threadID,
totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0)));
lastPrintTime = currentPrintTime;
}
}
}
}
et = taosGetTimestampMs();
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%"PRIu64" - %"PRIu64"] once queries duration:%.4fs\n\n",
taosGetSelfPthreadId(),
pThreadInfo->start_table_from,
pThreadInfo->end_table_to,
(double)(et - st)/1000.0);
}
et = taosGetTimestampMs();
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%"PRIu64" - %"PRIu64"] once queries duration:%.4fs\n\n",
taosGetSelfPthreadId(),
pThreadInfo->start_table_from,
pThreadInfo->end_table_to,
(double)(et - st)/1000.0);
}
return NULL;
return NULL;
}
static int queryTestProcess() {
setupForAnsiEscape();
printfQueryMeta();
resetAfterAnsiEscape();
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(-1);
}
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.superQueryInfo.sTblName,
&g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.superQueryInfo.childTblCount);
}
setupForAnsiEscape();
printfQueryMeta();
resetAfterAnsiEscape();
prompt();
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
NULL,
g_queryInfo.port);
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(-1);
}
if (g_args.debug_print || g_args.verbose_print) {
printfQuerySystemInfo(taos);
}
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.superQueryInfo.sTblName,
&g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.superQueryInfo.childTblCount);
}
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
if (convertHostToServAddr(
g_queryInfo.host, g_queryInfo.port, &g_queryInfo.serv_addr) != 0)
exit(-1);
}
prompt();
pthread_t *pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
if (g_args.debug_print || g_args.verbose_print) {
printfQuerySystemInfo(taos);
}
uint64_t startTs = taosGetTimestampMs();
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
if (convertHostToServAddr(
g_queryInfo.host, g_queryInfo.port, &g_queryInfo.serv_addr) != 0)
exit(-1);
}
if ((nSqlCount > 0) && (nConcurrent > 0)) {
pthread_t *pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
pids = calloc(1, nConcurrent * nSqlCount * sizeof(pthread_t));
infos = calloc(1, nConcurrent * nSqlCount * sizeof(threadInfo));
uint64_t startTs = taosGetTimestampMs();
if ((NULL == pids) || (NULL == infos)) {
taos_close(taos);
ERROR_EXIT("memory allocation failed for create threads\n");
}
if ((nSqlCount > 0) && (nConcurrent > 0)) {
for (uint64_t i = 0; i < nSqlCount; i++) {
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pids = calloc(1, nConcurrent * nSqlCount * sizeof(pthread_t));
infos = calloc(1, nConcurrent * nSqlCount * sizeof(threadInfo));
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
if ((NULL == pids) || (NULL == infos)) {
taos_close(taos);
ERROR_EXIT("memory allocation failed for create threads\n");
}
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos);
free(pids);
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
for (uint64_t i = 0; i < nSqlCount; i++) {
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos);
free(pids);
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
}
}
}
pThreadInfo->taos = NULL;// TODO: workaround to use separate taos connection;
pThreadInfo->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedTableQuery,
pThreadInfo);
pthread_create(pids + seq, NULL, specifiedTableQuery,
pThreadInfo);
}
}
} else {
g_queryInfo.specifiedQueryInfo.concurrent = 0;
}
} else {
g_queryInfo.specifiedQueryInfo.concurrent = 0;
}
taos_close(taos);
taos_close(taos);
pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL;
//==== create sub threads for query from all sub table of the super table
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = calloc(1, g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t));
infosOfSub = calloc(1, g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo));
pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL;
//==== create sub threads for query from all sub table of the super table
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = calloc(1, g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t));
infosOfSub = calloc(1, g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
free(infos);
free(pids);
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
free(infos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n");
}
ERROR_EXIT("memory allocation failed for create threads\n");
}
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
uint64_t tableFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infosOfSub + i;
pThreadInfo->threadID = i;
uint64_t tableFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infosOfSub + i;
pThreadInfo->threadID = i;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
g_queryInfo.superQueryInfo.threadCnt = threads;
} else {
g_queryInfo.superQueryInfo.threadCnt = 0;
}
g_queryInfo.superQueryInfo.threadCnt = threads;
} else {
g_queryInfo.superQueryInfo.threadCnt = 0;
}
if ((nSqlCount > 0) && (nConcurrent > 0)) {
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
}
if ((nSqlCount > 0) && (nConcurrent > 0)) {
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
}
}
}
}
tmfree((char*)pids);
tmfree((char*)infos);
tmfree((char*)pids);
tmfree((char*)infos);
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
}
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
}
tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub);
tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub);
// taos_close(taos);// TODO: workaround to use separate taos connection;
uint64_t endTs = taosGetTimestampMs();
// taos_close(taos);// TODO: workaround to use separate taos connection;
uint64_t endTs = taosGetTimestampMs();
uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
g_queryInfo.superQueryInfo.totalQueried;
uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
g_queryInfo.superQueryInfo.totalQueried;
fprintf(stderr, "==== completed total queries: %"PRIu64", the QPS of all threads: %10.3f====\n",
totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0)));
return 0;
fprintf(stderr, "==== completed total queries: %"PRIu64", the QPS of all threads: %10.3f====\n",
totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0)));
return 0;
}
static void stable_sub_callback(
TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res));
return;
}
if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res));
return;
}
if (param)
fetchResult(res, (threadInfo *)param);
// tao_unscribe() will free result.
if (param)
fetchResult(res, (threadInfo *)param);
// tao_unscribe() will free result.
}
static void specified_sub_callback(
TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res));
return;
}
if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res));
return;
}
if (param)
fetchResult(res, (threadInfo *)param);
// tao_unscribe() will free result.
if (param)
fetchResult(res, (threadInfo *)param);
// tao_unscribe() will free result.
}
static TAOS_SUB* subscribeImpl(
......@@ -7634,35 +7642,35 @@ static TAOS_SUB* subscribeImpl(
threadInfo *pThreadInfo,
char *sql, char* topic, bool restart, uint64_t interval)
{
TAOS_SUB* tsub = NULL;
if ((SPECIFIED_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, specified_sub_callback, (void*)pThreadInfo,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else if ((STABLE_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, stable_sub_callback, (void*)pThreadInfo,
g_queryInfo.superQueryInfo.subscribeInterval);
} else {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, NULL, NULL, interval);
}
if (tsub == NULL) {
errorPrint("failed to create subscription. topic:%s, sql:%s\n", topic, sql);
return NULL;
}
return tsub;
TAOS_SUB* tsub = NULL;
if ((SPECIFIED_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, specified_sub_callback, (void*)pThreadInfo,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else if ((STABLE_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, stable_sub_callback, (void*)pThreadInfo,
g_queryInfo.superQueryInfo.subscribeInterval);
} else {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, NULL, NULL, interval);
}
if (tsub == NULL) {
errorPrint("failed to create subscription. topic:%s, sql:%s\n", topic, sql);
return NULL;
}
return tsub;
}
static void *superSubscribe(void *sarg) {
......@@ -7816,291 +7824,291 @@ static void *superSubscribe(void *sarg) {
}
static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
// TAOS_SUB* tsub = NULL;
threadInfo *pThreadInfo = (threadInfo *)sarg;
// TAOS_SUB* tsub = NULL;
setThreadName("specSub");
setThreadName("specSub");
if (pThreadInfo->taos == NULL) {
pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
}
}
}
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
return NULL;
}
sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
"taosdemo-subscribe-%"PRIu64"-%d",
pThreadInfo->querySeq,
pThreadInfo->threadID);
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
return NULL;
}
sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
"taosdemo-subscribe-%"PRIu64"-%d",
pThreadInfo->querySeq,
pThreadInfo->threadID);
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl(
SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
}
// start loop to consume result
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
while((g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq] == -1)
|| (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] <
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) {
printf("consumed[%d]: %d, endAfterConsum[%"PRId64"]: %d\n",
pThreadInfo->threadID,
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID],
pThreadInfo->querySeq,
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq]);
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue;
}
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]);
if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0]
!= 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
fetchResult(
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID],
pThreadInfo);
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
if ((g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq] != -1)
&& (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] =
subscribeImpl(
SPECIFIED_CLASS,
pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]);
taos_close(pThreadInfo->taos);
}
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl(
SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
}
// start loop to consume result
return NULL;
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
while((g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq] == -1)
|| (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] <
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) {
printf("consumed[%d]: %d, endAfterConsum[%"PRId64"]: %d\n",
pThreadInfo->threadID,
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID],
pThreadInfo->querySeq,
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq]);
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue;
}
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]);
if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0]
!= 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
fetchResult(
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID],
pThreadInfo);
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
if ((g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq] != -1)
&& (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] =
subscribeImpl(
SPECIFIED_CLASS,
pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]);
taos_close(pThreadInfo->taos);
return NULL;
}
static int subscribeTestProcess() {
setupForAnsiEscape();
printfQueryMeta();
resetAfterAnsiEscape();
prompt();
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(-1);
}
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos,
setupForAnsiEscape();
printfQueryMeta();
resetAfterAnsiEscape();
prompt();
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.superQueryInfo.sTblName,
&g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.superQueryInfo.childTblCount);
}
taos_close(taos); // TODO: workaround to use separate taos connection;
pthread_t *pids = NULL;
threadInfo *infos = NULL;
pthread_t *pidsOfStable = NULL;
threadInfo *infosOfStable = NULL;
//==== create threads for query for specified table
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
debugPrint("%s() LN%d, sepcified query sqlCount %d.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
} else {
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
errorPrint("%s() LN%d, sepcified query sqlCount %d.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
g_queryInfo.port);
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(-1);
}
pids = calloc(
1,
g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(pthread_t));
infos = calloc(
1,
g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1);
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.superQueryInfo.sTblName,
&g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.superQueryInfo.childTblCount);
}
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, pThreadInfo);
taos_close(taos); // TODO: workaround to use separate taos connection;
pthread_t *pids = NULL;
threadInfo *infos = NULL;
pthread_t *pidsOfStable = NULL;
threadInfo *infosOfStable = NULL;
//==== create threads for query for specified table
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
debugPrint("%s() LN%d, sepcified query sqlCount %d.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
} else {
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
errorPrint("%s() LN%d, sepcified query sqlCount %d.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
exit(-1);
}
}
}
//==== create threads for super table query
if (g_queryInfo.superQueryInfo.sqlCount <= 0) {
debugPrint("%s() LN%d, super table query sqlCount %d.\n",
__func__, __LINE__,
g_queryInfo.superQueryInfo.sqlCount);
} else {
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfStable = calloc(
pids = calloc(
1,
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(pthread_t));
infosOfStable = calloc(
g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(pthread_t));
infos = calloc(
1,
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(threadInfo));
if ((NULL == pidsOfStable) || (NULL == infosOfStable)) {
errorPrint("%s() LN%d, malloc failed for create threads\n",
__func__, __LINE__);
// taos_close(taos);
g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1);
}
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, pThreadInfo);
}
}
}
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
//==== create threads for super table query
if (g_queryInfo.superQueryInfo.sqlCount <= 0) {
debugPrint("%s() LN%d, super table query sqlCount %d.\n",
__func__, __LINE__,
g_queryInfo.superQueryInfo.sqlCount);
} else {
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfStable = calloc(
1,
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(pthread_t));
infosOfStable = calloc(
1,
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(threadInfo));
if ((NULL == pidsOfStable) || (NULL == infosOfStable)) {
errorPrint("%s() LN%d, malloc failed for create threads\n",
__func__, __LINE__);
// taos_close(taos);
exit(-1);
}
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
uint64_t tableFrom = 0;
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
threadInfo *pThreadInfo = infosOfStable + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = j<b?a+1:a;
pThreadInfo->end_table_to = j<b?tableFrom+a:tableFrom+a-1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, pThreadInfo);
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
}
g_queryInfo.superQueryInfo.threadCnt = threads;
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
uint64_t tableFrom = 0;
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
threadInfo *pThreadInfo = infosOfStable + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = j<b?a+1:a;
pThreadInfo->end_table_to = j<b?tableFrom+a:tableFrom+a-1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, pThreadInfo);
}
}
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
pthread_join(pidsOfStable[seq], NULL);
g_queryInfo.superQueryInfo.threadCnt = threads;
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
pthread_join(pidsOfStable[seq], NULL);
}
}
}
}
}
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
pthread_join(pids[seq], NULL);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
pthread_join(pids[seq], NULL);
}
}
}
tmfree((char*)pids);
tmfree((char*)infos);
tmfree((char*)pids);
tmfree((char*)infos);
tmfree((char*)pidsOfStable);
tmfree((char*)infosOfStable);
// taos_close(taos);
return 0;
tmfree((char*)pidsOfStable);
tmfree((char*)infosOfStable);
// taos_close(taos);
return 0;
}
static void initOfInsertMeta() {
memset(&g_Dbs, 0, sizeof(SDbs));
memset(&g_Dbs, 0, sizeof(SDbs));
// set default values
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
g_Dbs.port = 6030;
tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE);
tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, MAX_PASSWORD_SIZE);
g_Dbs.threadCount = 2;
// set default values
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
g_Dbs.port = 6030;
tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE);
tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, MAX_PASSWORD_SIZE);
g_Dbs.threadCount = 2;
g_Dbs.use_metric = g_args.use_metric;
g_Dbs.use_metric = g_args.use_metric;
}
static void initOfQueryMeta() {
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
// set default values
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
g_queryInfo.port = 6030;
tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE);
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_PASSWORD_SIZE);
// set default values
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
g_queryInfo.port = 6030;
tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE);
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_PASSWORD_SIZE);
}
static void setParaFromArg() {
......@@ -8218,37 +8226,37 @@ static void setParaFromArg() {
/* Function to do regular expression check */
static int regexMatch(const char *s, const char *reg, int cflags) {
regex_t regex;
char msgbuf[100] = {0};
/* Compile regular expression */
if (regcomp(&regex, reg, cflags) != 0) {
printf("Fail to compile regex\n");
exit(-1);
}
/* Execute regular expression */
int reti = regexec(&regex, s, 0, NULL, 0);
if (!reti) {
regfree(&regex);
return 1;
} else if (reti == REG_NOMATCH) {
regfree(&regex);
regex_t regex;
char msgbuf[100] = {0};
/* Compile regular expression */
if (regcomp(&regex, reg, cflags) != 0) {
printf("Fail to compile regex\n");
exit(-1);
}
/* Execute regular expression */
int reti = regexec(&regex, s, 0, NULL, 0);
if (!reti) {
regfree(&regex);
return 1;
} else if (reti == REG_NOMATCH) {
regfree(&regex);
return 0;
} else {
regerror(reti, &regex, msgbuf, sizeof(msgbuf));
printf("Regex match failed: %s\n", msgbuf);
regfree(&regex);
exit(-1);
}
return 0;
} else {
regerror(reti, &regex, msgbuf, sizeof(msgbuf));
printf("Regex match failed: %s\n", msgbuf);
regfree(&regex);
exit(-1);
}
return 0;
}
static int isCommentLine(char *line) {
if (line == NULL) return 1;
if (line == NULL) return 1;
return regexMatch(line, "^\\s*#.*", REG_EXTENDED);
return regexMatch(line, "^\\s*#.*", REG_EXTENDED);
}
static void querySqlFile(TAOS* taos, char* sqlFile)
......@@ -8306,131 +8314,131 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
static void testMetaFile() {
if (INSERT_TEST == g_args.test_mode) {
if (g_Dbs.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir);
if (g_Dbs.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir);
insertTestProcess();
insertTestProcess();
} else if (QUERY_TEST == g_args.test_mode) {
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
queryTestProcess();
queryTestProcess();
} else if (SUBSCRIBE_TEST == g_args.test_mode) {
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
subscribeTestProcess();
subscribeTestProcess();
} else {
;
;
}
}
static void queryResult() {
// query data
pthread_t read_id;
threadInfo *pThreadInfo = calloc(1, sizeof(threadInfo));
assert(pThreadInfo);
pThreadInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
pThreadInfo->start_table_from = 0;
//pThreadInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
if (g_args.use_metric) {
pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, TSDB_TABLE_NAME_LEN - 20);
} else {
pThreadInfo->ntables = g_args.num_of_tables;
pThreadInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(pThreadInfo->tb_prefix, g_args.tb_prefix, TSDB_TABLE_NAME_LEN);
}
pThreadInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
if (pThreadInfo->taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
// query data
pthread_t read_id;
threadInfo *pThreadInfo = calloc(1, sizeof(threadInfo));
assert(pThreadInfo);
pThreadInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
pThreadInfo->start_table_from = 0;
//pThreadInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
if (g_args.use_metric) {
pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, TSDB_TABLE_NAME_LEN - 20);
} else {
pThreadInfo->ntables = g_args.num_of_tables;
pThreadInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(pThreadInfo->tb_prefix, g_args.tb_prefix, TSDB_TABLE_NAME_LEN);
}
pThreadInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
if (pThreadInfo->taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
free(pThreadInfo);
exit(-1);
}
tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
if (!g_Dbs.use_metric) {
pthread_create(&read_id, NULL, readTable, pThreadInfo);
} else {
pthread_create(&read_id, NULL, readMetric, pThreadInfo);
}
pthread_join(read_id, NULL);
taos_close(pThreadInfo->taos);
free(pThreadInfo);
exit(-1);
}
tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
if (!g_Dbs.use_metric) {
pthread_create(&read_id, NULL, readTable, pThreadInfo);
} else {
pthread_create(&read_id, NULL, readMetric, pThreadInfo);
}
pthread_join(read_id, NULL);
taos_close(pThreadInfo->taos);
free(pThreadInfo);
}
static void testCmdLine() {
if (strlen(configDir)) {
wordexp_t full_path;
if (wordexp(configDir, &full_path, 0) != 0) {
errorPrint( "Invalid path %s\n", configDir);
return;
if (strlen(configDir)) {
wordexp_t full_path;
if (wordexp(configDir, &full_path, 0) != 0) {
errorPrint( "Invalid path %s\n", configDir);
return;
}
taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]);
wordfree(&full_path);
}
taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]);
wordfree(&full_path);
}
g_args.test_mode = INSERT_TEST;
insertTestProcess();
g_args.test_mode = INSERT_TEST;
insertTestProcess();
if (false == g_Dbs.insert_only)
queryResult();
if (false == g_Dbs.insert_only)
queryResult();
}
int main(int argc, char *argv[]) {
parse_args(argc, argv, &g_args);
debugPrint("meta file: %s\n", g_args.metaFile);
parse_args(argc, argv, &g_args);
if (g_args.metaFile) {
initOfInsertMeta();
initOfQueryMeta();
debugPrint("meta file: %s\n", g_args.metaFile);
if (false == getInfoFromJsonFile(g_args.metaFile)) {
printf("Failed to read %s\n", g_args.metaFile);
return 1;
}
if (g_args.metaFile) {
initOfInsertMeta();
initOfQueryMeta();
testMetaFile();
} else {
memset(&g_Dbs, 0, sizeof(SDbs));
setParaFromArg();
if (NULL != g_args.sqlFile) {
TAOS* qtaos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
querySqlFile(qtaos, g_args.sqlFile);
taos_close(qtaos);
if (false == getInfoFromJsonFile(g_args.metaFile)) {
printf("Failed to read %s\n", g_args.metaFile);
return 1;
}
testMetaFile();
} else {
testCmdLine();
}
memset(&g_Dbs, 0, sizeof(SDbs));
setParaFromArg();
if (NULL != g_args.sqlFile) {
TAOS* qtaos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
querySqlFile(qtaos, g_args.sqlFile);
taos_close(qtaos);
if (g_dupstr)
free(g_dupstr);
}
} else {
testCmdLine();
}
return 0;
if (g_dupstr)
free(g_dupstr);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册