未验证 提交 0388ffa7 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 5306 taosdemo stmt autocreatetable (#6950)

* [TD-5300]<fix>: taosdemo stmt debug print.

* fix default iface is unknown.
上级 b538d580
...@@ -90,7 +90,7 @@ extern char configDir[]; ...@@ -90,7 +90,7 @@ extern char configDir[];
#define MAX_SUPER_TABLE_COUNT 200 #define MAX_SUPER_TABLE_COUNT 200
#define MAX_QUERY_SQL_COUNT 100 #define MAX_QUERY_SQL_COUNT 100
#define MAX_QUERY_SQL_LENGTH 1024 #define MAX_QUERY_SQL_LENGTH BUFFER_SIZE
#define MAX_DATABASE_COUNT 256 #define MAX_DATABASE_COUNT 256
#define INPUT_BUF_LEN 256 #define INPUT_BUF_LEN 256
...@@ -2367,7 +2367,25 @@ static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { ...@@ -2367,7 +2367,25 @@ static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
return dataBuf; return dataBuf;
} }
static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { static char *generateBinaryNCharTagValues(int64_t tableSeq, uint32_t len)
{
char* buf = (char*)calloc(len, 1);
if (NULL == buf) {
printf("calloc failed! size:%d\n", len);
return NULL;
}
if (tableSeq % 2) {
tstrncpy(buf, "beijing", len);
} else {
tstrncpy(buf, "shanghai", len);
}
//rand_string(buf, stbInfo->tags[i].dataLen);
return buf;
}
static char* generateTagValuesForStb(SSuperTable* stbInfo, int64_t tableSeq) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) { if (NULL == dataBuf) {
printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1);
...@@ -2388,20 +2406,12 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { ...@@ -2388,20 +2406,12 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) {
return NULL; return NULL;
} }
int tagBufLen = stbInfo->tags[i].dataLen + 1; int32_t tagBufLen = stbInfo->tags[i].dataLen + 1;
char* buf = (char*)calloc(tagBufLen, 1); char *buf = generateBinaryNCharTagValues(tableSeq, tagBufLen);
if (NULL == buf) { if (NULL == buf) {
printf("calloc failed! size:%d\n", stbInfo->tags[i].dataLen);
tmfree(dataBuf); tmfree(dataBuf);
return NULL; return NULL;
} }
if (tableSeq % 2) {
tstrncpy(buf, "beijing", tagBufLen);
} else {
tstrncpy(buf, "shanghai", tagBufLen);
}
//rand_string(buf, stbInfo->tags[i].dataLen);
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"\'%s\',", buf); "\'%s\',", buf);
tmfree(buf); tmfree(buf);
...@@ -2410,11 +2420,11 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { ...@@ -2410,11 +2420,11 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) {
if ((g_args.demo_mode) && (i == 0)) { if ((g_args.demo_mode) && (i == 0)) {
dataLen += snprintf(dataBuf + dataLen, dataLen += snprintf(dataBuf + dataLen,
TSDB_MAX_SQL_LEN - dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d,", tableSeq % 10); "%"PRId64",", tableSeq % 10);
} else { } else {
dataLen += snprintf(dataBuf + dataLen, dataLen += snprintf(dataBuf + dataLen,
TSDB_MAX_SQL_LEN - dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d,", tableSeq); "%"PRId64",", tableSeq);
} }
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, } else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"bigint", strlen("bigint"))) { "bigint", strlen("bigint"))) {
...@@ -2445,7 +2455,7 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) { ...@@ -2445,7 +2455,7 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%"PRId64",", rand_bigint()); "%"PRId64",", rand_bigint());
} else { } else {
printf("No support data type: %s\n", stbInfo->tags[i].dataType); errorPrint("No support data type: %s\n", stbInfo->tags[i].dataType);
tmfree(dataBuf); tmfree(dataBuf);
return NULL; return NULL;
} }
...@@ -2734,7 +2744,7 @@ static int createSuperTable( ...@@ -2734,7 +2744,7 @@ static int createSuperTable(
} else if (strcasecmp(dataType, "INT") == 0) { } else if (strcasecmp(dataType, "INT") == 0) {
if ((g_args.demo_mode) && (colIndex == 1)) { if ((g_args.demo_mode) && (colIndex == 1)) {
len += snprintf(cols + len, COL_BUFFER_LEN - len, len += snprintf(cols + len, COL_BUFFER_LEN - len,
",VOLTAGE INT"); ", VOLTAGE INT");
} else { } else {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "INT"); len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "INT");
} }
...@@ -2756,9 +2766,9 @@ static int createSuperTable( ...@@ -2756,9 +2766,9 @@ static int createSuperTable(
} else if (strcasecmp(dataType, "FLOAT") == 0) { } else if (strcasecmp(dataType, "FLOAT") == 0) {
if (g_args.demo_mode) { if (g_args.demo_mode) {
if (colIndex == 0) { if (colIndex == 0) {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",CURRENT FLOAT"); len += snprintf(cols + len, COL_BUFFER_LEN - len, ", CURRENT FLOAT");
} else if (colIndex == 2) { } else if (colIndex == 2) {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",PHASE FLOAT"); len += snprintf(cols + len, COL_BUFFER_LEN - len, ", PHASE FLOAT");
} }
} else { } else {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "FLOAT"); len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "FLOAT");
...@@ -4276,577 +4286,577 @@ PARSE_OVER: ...@@ -4276,577 +4286,577 @@ PARSE_OVER:
} }
static bool getMetaFromQueryJsonFile(cJSON* root) { static bool getMetaFromQueryJsonFile(cJSON* root) {
bool ret = false; bool ret = false;
cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir");
if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) {
tstrncpy(g_queryInfo.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN);
}
cJSON* host = cJSON_GetObjectItem(root, "host");
if (host && host->type == cJSON_String && host->valuestring != NULL) {
tstrncpy(g_queryInfo.host, host->valuestring, MAX_HOSTNAME_SIZE);
} else if (!host) {
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
} else {
printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
}
cJSON* port = cJSON_GetObjectItem(root, "port");
if (port && port->type == cJSON_Number) {
g_queryInfo.port = port->valueint;
} else if (!port) {
g_queryInfo.port = 6030;
}
cJSON* user = cJSON_GetObjectItem(root, "user");
if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE);
} else if (!user) {
tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ;
}
cJSON* password = cJSON_GetObjectItem(root, "password"); cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir");
if (password && password->type == cJSON_String && password->valuestring != NULL) { if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) {
tstrncpy(g_queryInfo.password, password->valuestring, MAX_PASSWORD_SIZE); tstrncpy(g_queryInfo.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN);
} else if (!password) { }
tstrncpy(g_queryInfo.password, "taosdata", MAX_PASSWORD_SIZE);;
}
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, cJSON* host = cJSON_GetObjectItem(root, "host");
if (answerPrompt && answerPrompt->type == cJSON_String if (host && host->type == cJSON_String && host->valuestring != NULL) {
&& answerPrompt->valuestring != NULL) { tstrncpy(g_queryInfo.host, host->valuestring, MAX_HOSTNAME_SIZE);
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) { } else if (!host) {
g_args.answer_yes = false; tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
g_args.answer_yes = true;
} else { } else {
g_args.answer_yes = false; printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
} }
} else if (!answerPrompt) {
g_args.answer_yes = false;
} else {
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
goto PARSE_OVER;
}
cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times"); cJSON* port = cJSON_GetObjectItem(root, "port");
if (gQueryTimes && gQueryTimes->type == cJSON_Number) { if (port && port->type == cJSON_Number) {
if (gQueryTimes->valueint <= 0) { g_queryInfo.port = port->valueint;
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", } else if (!port) {
__func__, __LINE__); g_queryInfo.port = 6030;
goto PARSE_OVER;
} }
g_args.query_times = gQueryTimes->valueint;
} else if (!gQueryTimes) {
g_args.query_times = 1;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) {
tstrncpy(g_queryInfo.dbName, dbs->valuestring, TSDB_DB_NAME_LEN);
} else if (!dbs) {
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
}
cJSON* queryMode = cJSON_GetObjectItem(root, "query_mode"); cJSON* user = cJSON_GetObjectItem(root, "user");
if (queryMode && queryMode->type == cJSON_String && queryMode->valuestring != NULL) { if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_queryInfo.queryMode, queryMode->valuestring, MAX_TB_NAME_SIZE); tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE);
} else if (!queryMode) { } else if (!user) {
tstrncpy(g_queryInfo.queryMode, "taosc", MAX_TB_NAME_SIZE); tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ;
} else {
printf("ERROR: failed to read json, query_mode not found\n");
goto PARSE_OVER;
}
// specified_table_query
cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query");
if (!specifiedQuery) {
g_queryInfo.specifiedQueryInfo.concurrent = 1;
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (specifiedQuery->type != cJSON_Object) {
printf("ERROR: failed to read json, super_table_query not found\n");
goto PARSE_OVER;
} else {
cJSON* queryInterval = cJSON_GetObjectItem(specifiedQuery, "query_interval");
if (queryInterval && queryInterval->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.queryInterval = queryInterval->valueint;
} else if (!queryInterval) {
g_queryInfo.specifiedQueryInfo.queryInterval = 0;
} }
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, cJSON* password = cJSON_GetObjectItem(root, "password");
"query_times"); if (password && password->type == cJSON_String && password->valuestring != NULL) {
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { tstrncpy(g_queryInfo.password, password->valuestring, MAX_PASSWORD_SIZE);
if (specifiedQueryTimes->valueint <= 0) { } else if (!password) {
errorPrint( tstrncpy(g_queryInfo.password, "taosdata", MAX_PASSWORD_SIZE);;
"%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", }
__func__, __LINE__, specifiedQueryTimes->valueint);
goto PARSE_OVER;
} cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint; if (answerPrompt && answerPrompt->type == cJSON_String
} else if (!specifiedQueryTimes) { && answerPrompt->valuestring != NULL) {
g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times; if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
g_args.answer_yes = false;
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
g_args.answer_yes = true;
} else {
g_args.answer_yes = false;
}
} else if (!answerPrompt) {
g_args.answer_yes = false;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
__func__, __LINE__); goto PARSE_OVER;
goto PARSE_OVER;
} }
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent"); cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times");
if (concurrent && concurrent->type == cJSON_Number) { if (gQueryTimes && gQueryTimes->type == cJSON_Number) {
if (concurrent->valueint <= 0) { if (gQueryTimes->valueint <= 0) {
errorPrint( errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
"%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n", __func__, __LINE__);
__func__, __LINE__, goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.sqlCount, }
g_queryInfo.specifiedQueryInfo.concurrent); g_args.query_times = gQueryTimes->valueint;
goto PARSE_OVER; } else if (!gQueryTimes) {
} g_args.query_times = 1;
g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
} else if (!concurrent) {
g_queryInfo.specifiedQueryInfo.concurrent = 1;
}
cJSON* specifiedAsyncMode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (specifiedAsyncMode && specifiedAsyncMode->type == cJSON_String
&& specifiedAsyncMode->valuestring != NULL) {
if (0 == strcmp("sync", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
} else if (0 == strcmp("async", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.asyncMode = ASYNC_MODE;
} else {
errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else { } else {
g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE; errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
} }
cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval"); cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (interval && interval->type == cJSON_Number) { if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) {
g_queryInfo.specifiedQueryInfo.subscribeInterval = interval->valueint; tstrncpy(g_queryInfo.dbName, dbs->valuestring, TSDB_DB_NAME_LEN);
} else if (!interval) { } else if (!dbs) {
//printf("failed to read json, subscribe interval no found\n"); printf("ERROR: failed to read json, databases not found\n");
//goto PARSE_OVER; goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.subscribeInterval = 10000;
} }
cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart"); cJSON* queryMode = cJSON_GetObjectItem(root, "query_mode");
if (restart && restart->type == cJSON_String && restart->valuestring != NULL) { if (queryMode && queryMode->type == cJSON_String && queryMode->valuestring != NULL) {
if (0 == strcmp("yes", restart->valuestring)) { tstrncpy(g_queryInfo.queryMode, queryMode->valuestring, MAX_TB_NAME_SIZE);
g_queryInfo.specifiedQueryInfo.subscribeRestart = true; } else if (!queryMode) {
} else if (0 == strcmp("no", restart->valuestring)) { tstrncpy(g_queryInfo.queryMode, "taosc", MAX_TB_NAME_SIZE);
g_queryInfo.specifiedQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else { } else {
g_queryInfo.specifiedQueryInfo.subscribeRestart = true; printf("ERROR: failed to read json, query_mode not found\n");
goto PARSE_OVER;
} }
cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress"); // specified_table_query
if (keepProgress cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query");
&& keepProgress->type == cJSON_String if (!specifiedQuery) {
&& keepProgress->valuestring != NULL) { g_queryInfo.specifiedQueryInfo.concurrent = 1;
if (0 == strcmp("yes", keepProgress->valuestring)) { g_queryInfo.specifiedQueryInfo.sqlCount = 0;
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 1; } else if (specifiedQuery->type != cJSON_Object) {
} else if (0 == strcmp("no", keepProgress->valuestring)) { printf("ERROR: failed to read json, super_table_query not found\n");
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe keepProgress error\n");
goto PARSE_OVER; goto PARSE_OVER;
}
} else { } else {
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0; cJSON* queryInterval = cJSON_GetObjectItem(specifiedQuery, "query_interval");
} if (queryInterval && queryInterval->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.queryInterval = queryInterval->valueint;
} else if (!queryInterval) {
g_queryInfo.specifiedQueryInfo.queryInterval = 0;
}
// sqls cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls"); "query_times");
if (!specifiedSqls) { if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.sqlCount = 0; if (specifiedQueryTimes->valueint <= 0) {
} else if (specifiedSqls->type != cJSON_Array) { errorPrint(
errorPrint("%s() LN%d, failed to read json, super sqls not found\n", "%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__); __func__, __LINE__, specifiedQueryTimes->valueint);
goto PARSE_OVER; goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(specifiedSqls);
if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent
> MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n",
__func__, __LINE__,
superSqlSize,
g_queryInfo.specifiedQueryInfo.concurrent,
MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize; }
for (int j = 0; j < superSqlSize; ++j) { g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint;
cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j); } else if (!specifiedQueryTimes) {
if (sql == NULL) continue; g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
// default value is -1, which mean infinite loop
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
cJSON* endAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "endAfterConsume");
if (endAfterConsume
&& endAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.endAfterConsume[j]
= endAfterConsume->valueint;
}
if (g_queryInfo.specifiedQueryInfo.endAfterConsume[j] < -1)
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if ((resubAfterConsume)
&& (resubAfterConsume->type == cJSON_Number)
&& (resubAfterConsume->valueint >= 0)) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
}
if (g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] < -1)
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
cJSON *result = cJSON_GetObjectItem(sql, "result");
if ((NULL != result) && (result->type == cJSON_String)
&& (result->valuestring != NULL)) {
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
memset(g_queryInfo.specifiedQueryInfo.result[j],
0, MAX_FILE_NAME_LEN);
} else { } else {
printf("ERROR: failed to read json, super query result file not found\n"); errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
goto PARSE_OVER; __func__, __LINE__);
goto PARSE_OVER;
} }
}
}
}
// super_table_query cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query"); if (concurrent && concurrent->type == cJSON_Number) {
if (!superQuery) { if (concurrent->valueint <= 0) {
g_queryInfo.superQueryInfo.threadCnt = 1; errorPrint(
g_queryInfo.superQueryInfo.sqlCount = 0; "%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
} else if (superQuery->type != cJSON_Object) { __func__, __LINE__,
printf("ERROR: failed to read json, sub_table_query not found\n"); g_queryInfo.specifiedQueryInfo.sqlCount,
ret = true; g_queryInfo.specifiedQueryInfo.concurrent);
goto PARSE_OVER; goto PARSE_OVER;
} else { }
cJSON* subrate = cJSON_GetObjectItem(superQuery, "query_interval"); g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
if (subrate && subrate->type == cJSON_Number) { } else if (!concurrent) {
g_queryInfo.superQueryInfo.queryInterval = subrate->valueint; g_queryInfo.specifiedQueryInfo.concurrent = 1;
} else if (!subrate) { }
g_queryInfo.superQueryInfo.queryInterval = 0;
}
cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times");
if (superQueryTimes && superQueryTimes->type == cJSON_Number) {
if (superQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__, superQueryTimes->valueint);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint;
} else if (!superQueryTimes) {
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* threads = cJSON_GetObjectItem(superQuery, "threads"); cJSON* specifiedAsyncMode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (threads && threads->type == cJSON_Number) { if (specifiedAsyncMode && specifiedAsyncMode->type == cJSON_String
if (threads->valueint <= 0) { && specifiedAsyncMode->valuestring != NULL) {
errorPrint("%s() LN%d, failed to read json, threads input mistake\n", if (0 == strcmp("sync", specifiedAsyncMode->valuestring)) {
__func__, __LINE__); g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
goto PARSE_OVER; } else if (0 == strcmp("async", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.asyncMode = ASYNC_MODE;
} else {
errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
}
} cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
g_queryInfo.superQueryInfo.threadCnt = threads->valueint; if (interval && interval->type == cJSON_Number) {
} else if (!threads) { g_queryInfo.specifiedQueryInfo.subscribeInterval = interval->valueint;
g_queryInfo.superQueryInfo.threadCnt = 1; } else if (!interval) {
} //printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.subscribeInterval = 10000;
}
//cJSON* subTblCnt = cJSON_GetObjectItem(superQuery, "childtable_count"); cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart");
//if (subTblCnt && subTblCnt->type == cJSON_Number) { if (restart && restart->type == cJSON_String && restart->valuestring != NULL) {
// g_queryInfo.superQueryInfo.childTblCount = subTblCnt->valueint; if (0 == strcmp("yes", restart->valuestring)) {
//} else if (!subTblCnt) { g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
// g_queryInfo.superQueryInfo.childTblCount = 0; } else if (0 == strcmp("no", restart->valuestring)) {
//} g_queryInfo.specifiedQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
}
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname"); cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress");
if (stblname && stblname->type == cJSON_String if (keepProgress
&& stblname->valuestring != NULL) { && keepProgress->type == cJSON_String
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, && keepProgress->valuestring != NULL) {
TSDB_TABLE_NAME_LEN); if (0 == strcmp("yes", keepProgress->valuestring)) {
} else { g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 1;
errorPrint("%s() LN%d, failed to read json, super table name input error\n", } else if (0 == strcmp("no", keepProgress->valuestring)) {
__func__, __LINE__); g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
goto PARSE_OVER; } else {
} printf("ERROR: failed to read json, subscribe keepProgress error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
}
cJSON* superAsyncMode = cJSON_GetObjectItem(superQuery, "mode"); // sqls
if (superAsyncMode && superAsyncMode->type == cJSON_String cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls");
&& superAsyncMode->valuestring != NULL) { if (!specifiedSqls) {
if (0 == strcmp("sync", superAsyncMode->valuestring)) { g_queryInfo.specifiedQueryInfo.sqlCount = 0;
g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE; } else if (specifiedSqls->type != cJSON_Array) {
} else if (0 == strcmp("async", superAsyncMode->valuestring)) { errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
g_queryInfo.superQueryInfo.asyncMode = ASYNC_MODE; __func__, __LINE__);
} else { goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, async mode input error\n", } else {
__func__, __LINE__); int superSqlSize = cJSON_GetArraySize(specifiedSqls);
goto PARSE_OVER; if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent
} > MAX_QUERY_SQL_COUNT) {
} else { errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n",
g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE; __func__, __LINE__,
} superSqlSize,
g_queryInfo.specifiedQueryInfo.concurrent,
MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
cJSON* superInterval = cJSON_GetObjectItem(superQuery, "interval"); g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize;
if (superInterval && superInterval->type == cJSON_Number) { for (int j = 0; j < superSqlSize; ++j) {
if (superInterval->valueint < 0) { cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j);
errorPrint("%s() LN%d, failed to read json, interval input mistake\n", if (sql == NULL) continue;
__func__, __LINE__);
goto PARSE_OVER; cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
} if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
g_queryInfo.superQueryInfo.subscribeInterval = superInterval->valueint; printf("ERROR: failed to read json, sql not found\n");
} else if (!superInterval) { goto PARSE_OVER;
//printf("failed to read json, subscribe interval no found\n"); }
//goto PARSE_OVER; tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
g_queryInfo.superQueryInfo.subscribeInterval = 10000; sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
}
// default value is -1, which mean infinite loop
cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart"); g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
if (subrestart && subrestart->type == cJSON_String cJSON* endAfterConsume =
&& subrestart->valuestring != NULL) { cJSON_GetObjectItem(specifiedQuery, "endAfterConsume");
if (0 == strcmp("yes", subrestart->valuestring)) { if (endAfterConsume
g_queryInfo.superQueryInfo.subscribeRestart = true; && endAfterConsume->type == cJSON_Number) {
} else if (0 == strcmp("no", subrestart->valuestring)) { g_queryInfo.specifiedQueryInfo.endAfterConsume[j]
g_queryInfo.superQueryInfo.subscribeRestart = false; = endAfterConsume->valueint;
} else { }
printf("ERROR: failed to read json, subscribe restart error\n"); if (g_queryInfo.specifiedQueryInfo.endAfterConsume[j] < -1)
goto PARSE_OVER; g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
}
} else { g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
g_queryInfo.superQueryInfo.subscribeRestart = true; cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if ((resubAfterConsume)
&& (resubAfterConsume->type == cJSON_Number)
&& (resubAfterConsume->valueint >= 0)) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
}
if (g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] < -1)
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
cJSON *result = cJSON_GetObjectItem(sql, "result");
if ((NULL != result) && (result->type == cJSON_String)
&& (result->valuestring != NULL)) {
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
memset(g_queryInfo.specifiedQueryInfo.result[j],
0, MAX_FILE_NAME_LEN);
} else {
printf("ERROR: failed to read json, super query result file not found\n");
goto PARSE_OVER;
}
}
}
} }
cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); // super_table_query
if (superkeepProgress && cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query");
superkeepProgress->type == cJSON_String if (!superQuery) {
&& superkeepProgress->valuestring != NULL) { g_queryInfo.superQueryInfo.threadCnt = 1;
if (0 == strcmp("yes", superkeepProgress->valuestring)) { g_queryInfo.superQueryInfo.sqlCount = 0;
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; } else if (superQuery->type != cJSON_Object) {
} else if (0 == strcmp("no", superkeepProgress->valuestring)) { printf("ERROR: failed to read json, sub_table_query not found\n");
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; ret = true;
} else {
printf("ERROR: failed to read json, subscribe super table keepProgress error\n");
goto PARSE_OVER; goto PARSE_OVER;
}
} else { } else {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; cJSON* subrate = cJSON_GetObjectItem(superQuery, "query_interval");
} if (subrate && subrate->type == cJSON_Number) {
g_queryInfo.superQueryInfo.queryInterval = subrate->valueint;
} else if (!subrate) {
g_queryInfo.superQueryInfo.queryInterval = 0;
}
cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times");
if (superQueryTimes && superQueryTimes->type == cJSON_Number) {
if (superQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__, superQueryTimes->valueint);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint;
} else if (!superQueryTimes) {
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
// default value is -1, which mean do not resub cJSON* threads = cJSON_GetObjectItem(superQuery, "threads");
g_queryInfo.superQueryInfo.endAfterConsume = -1; if (threads && threads->type == cJSON_Number) {
cJSON* superEndAfterConsume = if (threads->valueint <= 0) {
cJSON_GetObjectItem(superQuery, "endAfterConsume"); errorPrint("%s() LN%d, failed to read json, threads input mistake\n",
if (superEndAfterConsume __func__, __LINE__);
&& superEndAfterConsume->type == cJSON_Number) { goto PARSE_OVER;
g_queryInfo.superQueryInfo.endAfterConsume =
superEndAfterConsume->valueint; }
} g_queryInfo.superQueryInfo.threadCnt = threads->valueint;
if (g_queryInfo.superQueryInfo.endAfterConsume < -1) } else if (!threads) {
g_queryInfo.superQueryInfo.threadCnt = 1;
}
//cJSON* subTblCnt = cJSON_GetObjectItem(superQuery, "childtable_count");
//if (subTblCnt && subTblCnt->type == cJSON_Number) {
// g_queryInfo.superQueryInfo.childTblCount = subTblCnt->valueint;
//} else if (!subTblCnt) {
// g_queryInfo.superQueryInfo.childTblCount = 0;
//}
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
if (stblname && stblname->type == cJSON_String
&& stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
TSDB_TABLE_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, super table name input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* superAsyncMode = cJSON_GetObjectItem(superQuery, "mode");
if (superAsyncMode && superAsyncMode->type == cJSON_String
&& superAsyncMode->valuestring != NULL) {
if (0 == strcmp("sync", superAsyncMode->valuestring)) {
g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE;
} else if (0 == strcmp("async", superAsyncMode->valuestring)) {
g_queryInfo.superQueryInfo.asyncMode = ASYNC_MODE;
} else {
errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE;
}
cJSON* superInterval = cJSON_GetObjectItem(superQuery, "interval");
if (superInterval && superInterval->type == cJSON_Number) {
if (superInterval->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.subscribeInterval = superInterval->valueint;
} else if (!superInterval) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.superQueryInfo.subscribeInterval = 10000;
}
cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart");
if (subrestart && subrestart->type == cJSON_String
&& subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.subscribeRestart = true;
}
cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
if (superkeepProgress &&
superkeepProgress->type == cJSON_String
&& superkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe super table keepProgress error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
}
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.endAfterConsume = -1; g_queryInfo.superQueryInfo.endAfterConsume = -1;
cJSON* superEndAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume");
if (superEndAfterConsume
&& superEndAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume =
superEndAfterConsume->valueint;
}
if (g_queryInfo.superQueryInfo.endAfterConsume < -1)
g_queryInfo.superQueryInfo.endAfterConsume = -1;
// default value is -1, which mean do not resub // default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "resubAfterConsume");
if ((superResubAfterConsume)
&& (superResubAfterConsume->type == cJSON_Number)
&& (superResubAfterConsume->valueint >= 0)) {
g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint;
}
if (g_queryInfo.superQueryInfo.resubAfterConsume < -1)
g_queryInfo.superQueryInfo.resubAfterConsume = -1; g_queryInfo.superQueryInfo.resubAfterConsume = -1;
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "resubAfterConsume");
if ((superResubAfterConsume)
&& (superResubAfterConsume->type == cJSON_Number)
&& (superResubAfterConsume->valueint >= 0)) {
g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint;
}
if (g_queryInfo.superQueryInfo.resubAfterConsume < -1)
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
// supert table sqls
cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!superSqls) {
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) {
errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
// supert table sqls g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls"); for (int j = 0; j < superSqlSize; ++j) {
if (!superSqls) { cJSON* sql = cJSON_GetArrayItem(superSqls, j);
g_queryInfo.superQueryInfo.sqlCount = 0; if (sql == NULL) continue;
} else if (superSqls->type != cJSON_Array) {
errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String if (!sqlStr || sqlStr->type != cJSON_String
|| sqlStr->valuestring == NULL) { || sqlStr->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, sql not found\n", errorPrint("%s() LN%d, failed to read json, sql not found\n",
__func__, __LINE__); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH); MAX_QUERY_SQL_LENGTH);
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){ && result->valuestring != NULL){
tstrncpy(g_queryInfo.superQueryInfo.result[j], tstrncpy(g_queryInfo.superQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN); result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) { } else if (NULL == result) {
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else { } else {
errorPrint("%s() LN%d, failed to read json, sub query result file not found\n", errorPrint("%s() LN%d, failed to read json, sub query result file not found\n",
__func__, __LINE__); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
}
}
} }
}
} }
}
ret = true; ret = true;
PARSE_OVER: PARSE_OVER:
return ret; return ret;
} }
static bool getInfoFromJsonFile(char* file) { static bool getInfoFromJsonFile(char* file) {
debugPrint("%s %d %s\n", __func__, __LINE__, file); debugPrint("%s %d %s\n", __func__, __LINE__, file);
FILE *fp = fopen(file, "r"); FILE *fp = fopen(file, "r");
if (!fp) { if (!fp) {
printf("failed to read %s, reason:%s\n", file, strerror(errno)); printf("failed to read %s, reason:%s\n", file, strerror(errno));
return false; return false;
} }
bool ret = false; bool ret = false;
int maxLen = 6400000; int maxLen = 6400000;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp); int len = fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
free(content); free(content);
fclose(fp); fclose(fp);
printf("failed to read %s, content is null", file); printf("failed to read %s, content is null", file);
return false; return false;
} }
content[len] = 0; content[len] = 0;
cJSON* root = cJSON_Parse(content); cJSON* root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
printf("ERROR: failed to cjson parse %s, invalid json format\n", file); printf("ERROR: failed to cjson parse %s, invalid json format\n", file);
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* filetype = cJSON_GetObjectItem(root, "filetype"); cJSON* filetype = cJSON_GetObjectItem(root, "filetype");
if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) { if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) {
if (0 == strcasecmp("insert", filetype->valuestring)) { if (0 == strcasecmp("insert", filetype->valuestring)) {
g_args.test_mode = INSERT_TEST; g_args.test_mode = INSERT_TEST;
} else if (0 == strcasecmp("query", filetype->valuestring)) { } else if (0 == strcasecmp("query", filetype->valuestring)) {
g_args.test_mode = QUERY_TEST; g_args.test_mode = QUERY_TEST;
} else if (0 == strcasecmp("subscribe", filetype->valuestring)) { } else if (0 == strcasecmp("subscribe", filetype->valuestring)) {
g_args.test_mode = SUBSCRIBE_TEST; g_args.test_mode = SUBSCRIBE_TEST;
} else {
printf("ERROR: failed to read json, filetype not support\n");
goto PARSE_OVER;
}
} else if (!filetype) {
g_args.test_mode = INSERT_TEST;
} else { } else {
printf("ERROR: failed to read json, filetype not support\n"); printf("ERROR: failed to read json, filetype not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else if (!filetype) {
g_args.test_mode = INSERT_TEST;
} else {
printf("ERROR: failed to read json, filetype not found\n");
goto PARSE_OVER;
}
if (INSERT_TEST == g_args.test_mode) { if (INSERT_TEST == g_args.test_mode) {
ret = getMetaFromInsertJsonFile(root); ret = getMetaFromInsertJsonFile(root);
} else if ((QUERY_TEST == g_args.test_mode) } else if ((QUERY_TEST == g_args.test_mode)
|| (SUBSCRIBE_TEST == g_args.test_mode)) { || (SUBSCRIBE_TEST == g_args.test_mode)) {
ret = getMetaFromQueryJsonFile(root); ret = getMetaFromQueryJsonFile(root);
} else { } else {
errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n", errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n",
__func__, __LINE__); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
PARSE_OVER: PARSE_OVER:
free(content); free(content);
cJSON_Delete(root); cJSON_Delete(root);
fclose(fp); fclose(fp);
return ret; return ret;
} }
static int prepareSampleData() { static int prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) { if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) {
if (readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]) != 0) { if (readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]) != 0) {
return -1; return -1;
}
}
} }
}
} }
}
return 0; return 0;
} }
static void postFreeResource() { static void postFreeResource() {
tmfclose(g_fpOfInsertResult); tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
} }
if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) { if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf); free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
} }
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf); free(g_Dbs.db[i].superTbls[j].tagDataBuf);
g_Dbs.db[i].superTbls[j].tagDataBuf = NULL; g_Dbs.db[i].superTbls[j].tagDataBuf = NULL;
} }
if (0 != g_Dbs.db[i].superTbls[j].childTblName) { if (0 != g_Dbs.db[i].superTbls[j].childTblName) {
free(g_Dbs.db[i].superTbls[j].childTblName); free(g_Dbs.db[i].superTbls[j].childTblName);
g_Dbs.db[i].superTbls[j].childTblName = NULL; g_Dbs.db[i].superTbls[j].childTblName = NULL;
} }
}
} }
}
} }
static int getRowDataFromSample( static int getRowDataFromSample(
...@@ -5034,30 +5044,30 @@ static int64_t generateData(char *recBuf, char **data_type, ...@@ -5034,30 +5044,30 @@ static int64_t generateData(char *recBuf, char **data_type,
} }
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
char* sampleDataBuf = NULL; char* sampleDataBuf = NULL;
sampleDataBuf = calloc( sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) { if (sampleDataBuf == NULL) {
errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n", errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__, __func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno)); strerror(errno));
return -1; return -1;
} }
superTblInfo->sampleDataBuf = sampleDataBuf; superTblInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo); int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) { if (0 != ret) {
errorPrint("%s() LN%d, read sample from csv file failed.\n", errorPrint("%s() LN%d, read sample from csv file failed.\n",
__func__, __LINE__); __func__, __LINE__);
tmfree(sampleDataBuf); tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL; superTblInfo->sampleDataBuf = NULL;
return -1; return -1;
} }
return 0; return 0;
} }
static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
...@@ -5156,54 +5166,54 @@ static int32_t generateDataTailWithoutStb( ...@@ -5156,54 +5166,54 @@ static int32_t generateDataTailWithoutStb(
uint64_t recordFrom, int64_t startTime, uint64_t recordFrom, int64_t startTime,
/* int64_t *pSamplePos, */int64_t *dataLen) { /* int64_t *pSamplePos, */int64_t *dataLen) {
uint64_t len = 0; uint64_t len = 0;
char *pstr = buffer; char *pstr = buffer;
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch); verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
int32_t k = 0; int32_t k = 0;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE); memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0; int64_t retLen = 0;
char **data_type = g_args.datatype; char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary; int lenOfBinary = g_args.len_of_binary;
if (g_args.disorderRatio) { if (g_args.disorderRatio) {
retLen = generateData(data, data_type, retLen = generateData(data, data_type,
startTime + getTSRandTail( startTime + getTSRandTail(
(int64_t) DEFAULT_TIMESTAMP_STEP, k, (int64_t) DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio, g_args.disorderRatio,
g_args.disorderRange), g_args.disorderRange),
lenOfBinary); lenOfBinary);
} else { } else {
retLen = generateData(data, data_type, retLen = generateData(data, data_type,
startTime + (int64_t) (DEFAULT_TIMESTAMP_STEP* k), startTime + (int64_t) (DEFAULT_TIMESTAMP_STEP* k),
lenOfBinary); lenOfBinary);
} }
if (len > remainderBufLen) if (len > remainderBufLen)
break; break;
pstr += sprintf(pstr, "%s", data); pstr += sprintf(pstr, "%s", data);
k++; k++;
len += retLen; len += retLen;
remainderBufLen -= retLen; remainderBufLen -= retLen;
verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n", verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer); __func__, __LINE__, len, k, buffer);
recordFrom ++; recordFrom ++;
if (recordFrom >= insertRows) { if (recordFrom >= insertRows) {
break; break;
}
} }
}
*dataLen = len; *dataLen = len;
return k; return k;
} }
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
...@@ -5298,82 +5308,82 @@ static int generateSQLHeadWithoutStb(char *tableName, ...@@ -5298,82 +5308,82 @@ static int generateSQLHeadWithoutStb(char *tableName,
char *dbName, char *dbName,
char *buffer, int remainderBufLen) char *buffer, int remainderBufLen)
{ {
int len; int len;
char headBuf[HEAD_BUFF_LEN]; char headBuf[HEAD_BUFF_LEN];
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"%s.%s values", "%s.%s values",
dbName, dbName,
tableName); tableName);
if (len > remainderBufLen) if (len > remainderBufLen)
return -1; return -1;
tstrncpy(buffer, headBuf, len + 1); tstrncpy(buffer, headBuf, len + 1);
return len; return len;
} }
static int generateStbSQLHead( static int generateStbSQLHead(
SSuperTable* superTblInfo, SSuperTable* superTblInfo,
char *tableName, int32_t tableSeq, char *tableName, int64_t tableSeq,
char *dbName, char *dbName,
char *buffer, int remainderBufLen) char *buffer, int remainderBufLen)
{ {
int len; int len;
char headBuf[HEAD_BUFF_LEN]; char headBuf[HEAD_BUFF_LEN];
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) { if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq); tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq);
} else { } else {
tagsValBuf = getTagValueFromTagSample( tagsValBuf = getTagValueFromTagSample(
superTblInfo, superTblInfo,
tableSeq % superTblInfo->tagSampleCount); tableSeq % superTblInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n", errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__); __func__, __LINE__);
return -1; return -1;
} }
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"%s.%s using %s.%s TAGS%s values", "%s.%s using %s.%s TAGS%s values",
dbName, dbName,
tableName, tableName,
dbName, dbName,
superTblInfo->sTblName, superTblInfo->sTblName,
tagsValBuf); tagsValBuf);
tmfree(tagsValBuf); tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"%s.%s values", "%s.%s values",
dbName, dbName,
tableName); tableName);
} else { } else {
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"%s.%s values", "%s.%s values",
dbName, dbName,
tableName); tableName);
} }
if (len > remainderBufLen) if (len > remainderBufLen)
return -1; return -1;
tstrncpy(buffer, headBuf, len + 1); tstrncpy(buffer, headBuf, len + 1);
return len; return len;
} }
static int32_t generateStbInterlaceData( static int32_t generateStbInterlaceData(
...@@ -5651,8 +5661,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, ...@@ -5651,8 +5661,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
*ptr += bind->buffer_length; *ptr += bind->buffer_length;
} else { } else {
errorPrint( "No support data type: %s\n", errorPrint( "No support data type: %s\n", dataType);
dataType);
return -1; return -1;
} }
...@@ -5738,28 +5747,120 @@ static int32_t prepareStmtWithoutStb( ...@@ -5738,28 +5747,120 @@ static int32_t prepareStmtWithoutStb(
return k; return k;
} }
static int32_t prepareStbStmtBind(
char *bindArray, SSuperTable *stbInfo, bool sourceRand,
int64_t startTime, int32_t recSeq,
bool isColumn)
{
char *bindBuffer = calloc(1, g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind;
if (isColumn) {
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
if (i == 0) {
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else {
int cursor = 0;
if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
NULL)) {
free(bindBuffer);
return -1;
}
} else {
char *restStr = stbInfo->sampleDataBuf + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
memset(bindBuffer, 0, g_args.len_of_binary);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
bindBuffer)) {
free(bindBuffer);
return -1;
}
}
}
}
} else {
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) {
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
&ptr,
NULL)) {
free(bindBuffer);
return -1;
}
}
}
return 0;
}
static int32_t prepareStbStmt( static int32_t prepareStbStmt(
SSuperTable *stbInfo, SSuperTable *stbInfo,
TAOS_STMT *stmt, TAOS_STMT *stmt,
char *tableName, uint32_t batch, char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
int64_t *pSamplePos) int64_t *pSamplePos)
{ {
int ret = taos_stmt_set_tbname(stmt, tableName); int ret;
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
tableName, ret, taos_errstr(NULL));
return ret;
}
char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
bool sourceRand; bool sourceRand;
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) { if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
...@@ -5768,83 +5869,68 @@ static int32_t prepareStbStmt( ...@@ -5768,83 +5869,68 @@ static int32_t prepareStbStmt(
sourceRand = false; // from sample data file sourceRand = false; // from sample data file
} }
char *bindBuffer = malloc(g_args.len_of_binary); if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
if (bindBuffer == NULL) { char* tagsValBuf = NULL;
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
free(bindArray);
return -1;
}
uint32_t k; bool tagRand;
for (k = 0; k < batch;) { if (0 == stbInfo->tagSource) {
/* columnCount + 1 (ts) */ tagRand = true;
char data[MAX_DATA_SIZE]; tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
memset(data, 0, MAX_DATA_SIZE); } else {
tagRand = false;
tagsValBuf = getTagValueFromTagSample(
stbInfo,
tableSeq % stbInfo->tagSampleCount);
}
char *ptr = data; if (NULL == tagsValBuf) {
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
int64_t *bind_ts; char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount);
if (NULL == tagsArray) {
tmfree(tagsValBuf);
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
bind_ts = (int64_t *)ptr; if (-1 == prepareStbStmtBind(
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) {
if (stbInfo->disorderRatio) { free(tagsArray);
*bind_ts = startTime + getTSRandTail( return -1;
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * k;
} }
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length; ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray);
int cursor = 0; tmfree(tagsValBuf);
for (int i = 0; i < stbInfo->columnCount; i ++) { tmfree((char *)tagsArray);
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); } else {
ret = taos_stmt_set_tbname(stmt, tableName);
if (sourceRand) { }
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen,
&ptr,
NULL)) {
free(bindArray);
free(bindBuffer);
return -1;
}
} else {
char *restStr = stbInfo->sampleDataBuf + cursor;
int lengthOfRest = strlen(restStr);
int index = 0; if (ret != 0) {
for (index = 0; index < lengthOfRest; index ++) { errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
if (restStr[index] == ',') { tableName, ret, taos_errstr(NULL));
break; return ret;
} }
}
memset(bindBuffer, 0, g_args.len_of_binary); char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
strncpy(bindBuffer, restStr, index); if (bindArray == NULL) {
cursor += index + 1; // skip ',' too errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
if ( -1 == prepareStmtBindArrayByType( return -1;
bind, }
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen, uint32_t k;
&ptr, for (k = 0; k < batch;) {
bindBuffer)) { /* columnCount + 1 (ts) */
free(bindArray); if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand,
free(bindBuffer); startTime, k, true /* is column */)) {
return -1; free(bindArray);
} return -1;
}
} }
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
// if msg > 3MB, break // if msg > 3MB, break
...@@ -5862,7 +5948,6 @@ static int32_t prepareStbStmt( ...@@ -5862,7 +5948,6 @@ static int32_t prepareStbStmt(
} }
} }
free(bindBuffer);
free(bindArray); free(bindArray);
return k; return k;
} }
...@@ -5870,7 +5955,9 @@ static int32_t prepareStbStmt( ...@@ -5870,7 +5955,9 @@ static int32_t prepareStbStmt(
static int32_t prepareStbStmtInterlace( static int32_t prepareStbStmtInterlace(
SSuperTable *stbInfo, SSuperTable *stbInfo,
TAOS_STMT *stmt, TAOS_STMT *stmt,
char *tableName, uint32_t batch, char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
...@@ -5880,6 +5967,7 @@ static int32_t prepareStbStmtInterlace( ...@@ -5880,6 +5967,7 @@ static int32_t prepareStbStmtInterlace(
stbInfo, stbInfo,
stmt, stmt,
tableName, tableName,
tableSeq,
batch, batch,
insertRows, 0, startTime, insertRows, 0, startTime,
pSamplePos); pSamplePos);
...@@ -5888,7 +5976,9 @@ static int32_t prepareStbStmtInterlace( ...@@ -5888,7 +5976,9 @@ static int32_t prepareStbStmtInterlace(
static int32_t prepareStbStmtProgressive( static int32_t prepareStbStmtProgressive(
SSuperTable *stbInfo, SSuperTable *stbInfo,
TAOS_STMT *stmt, TAOS_STMT *stmt,
char *tableName, uint32_t batch, char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
...@@ -5898,6 +5988,7 @@ static int32_t prepareStbStmtProgressive( ...@@ -5898,6 +5988,7 @@ static int32_t prepareStbStmtProgressive(
stbInfo, stbInfo,
stmt, stmt,
tableName, tableName,
tableSeq,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, recordFrom, startTime, insertRows, recordFrom, startTime,
pSamplePos); pSamplePos);
...@@ -6098,6 +6189,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6098,6 +6189,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
superTblInfo, superTblInfo,
pThreadInfo->stmt, pThreadInfo->stmt,
tableName, tableName,
tableSeq,
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, i,
startTime, startTime,
...@@ -6264,164 +6356,165 @@ free_of_interlace: ...@@ -6264,164 +6356,165 @@ free_of_interlace:
// sync insertion progressive data // sync insertion progressive data
static void* syncWriteProgressive(threadInfo *pThreadInfo) { static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep = int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int64_t insertRows = int64_t insertRows =
(superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows); __func__, __LINE__, insertRows);
pThreadInfo->buffer = calloc(maxSqlLen, 1); pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) { if (NULL == pThreadInfo->buffer) {
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n", errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen, maxSqlLen,
strerror(errno)); strerror(errno));
return NULL;
}
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
pThreadInfo->samplePos = 0;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
for (uint64_t i = 0; i < insertRows;) {
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
__func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName);
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer);
return NULL; return NULL;
} }
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
pThreadInfo->samplePos = 0;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
for (uint64_t i = 0; i < insertRows;) {
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
__func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName);
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer);
return NULL;
}
int64_t remainderBufLen = maxSqlLen; int64_t remainderBufLen = maxSqlLen;
char *pstr = pThreadInfo->buffer; char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr, int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len; pstr += len;
remainderBufLen -= len; remainderBufLen -= len;
int32_t generated; int32_t generated;
if (superTblInfo) { if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) { if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtProgressive( generated = prepareStbStmtProgressive(
superTblInfo, superTblInfo,
pThreadInfo->stmt, pThreadInfo->stmt,
tableName, tableName,
g_args.num_of_RPR, tableSeq,
insertRows, i, start_time, g_args.num_of_RPR,
&(pThreadInfo->samplePos)); insertRows, i, start_time,
&(pThreadInfo->samplePos));
#else #else
generated = -1; generated = -1;
#endif #endif
} else { } else {
generated = generateStbProgressiveData( generated = generateStbProgressiveData(
superTblInfo, superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr, tableName, tableSeq, pThreadInfo->db_name, pstr,
insertRows, i, start_time, insertRows, i, start_time,
&(pThreadInfo->samplePos), &(pThreadInfo->samplePos),
&remainderBufLen); &remainderBufLen);
} }
} else { } else {
if (g_args.iface == STMT_IFACE) { if (g_args.iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb( generated = prepareStmtWithoutStb(
pThreadInfo->stmt, pThreadInfo->stmt,
tableName, tableName,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, i, insertRows, i,
start_time); start_time);
#else #else
generated = -1; generated = -1;
#endif #endif
} else { } else {
generated = generateProgressiveDataWithoutStb( generated = generateProgressiveDataWithoutStb(
tableName, tableName,
/* tableSeq, */ /* tableSeq, */
pThreadInfo, pstr, insertRows, pThreadInfo, pstr, insertRows,
i, start_time, i, start_time,
/* &(pThreadInfo->samplePos), */ /* &(pThreadInfo->samplePos), */
&remainderBufLen); &remainderBufLen);
} }
} }
if (generated > 0) if (generated > 0)
i += generated; i += generated;
else else
goto free_of_progressive; goto free_of_progressive;
start_time += generated * timeStampStep; start_time += generated * timeStampStep;
pThreadInfo->totalInsertRows += generated; pThreadInfo->totalInsertRows += generated;
startTs = taosGetTimestampMs(); startTs = taosGetTimestampMs();
int32_t affectedRows = execInsert(pThreadInfo, generated); int32_t affectedRows = execInsert(pThreadInfo, generated);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs; uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
__func__, __LINE__, delay); __func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%d\n", verbosePrint("[%d] %s() LN%d affectedRows=%d\n",
pThreadInfo->threadID, pThreadInfo->threadID,
__func__, __LINE__, affectedRows); __func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++; pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay; pThreadInfo->totalDelay += delay;
if (affectedRows < 0) { if (affectedRows < 0) {
errorPrint("%s() LN%d, affected rows: %d\n", errorPrint("%s() LN%d, affected rows: %d\n",
__func__, __LINE__, affectedRows); __func__, __LINE__, affectedRows);
goto free_of_progressive; goto free_of_progressive;
} }
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows); pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
if (i >= insertRows) if (i >= insertRows)
break; break;
} // num_of_DPT } // num_of_DPT
if ((g_args.verbose_print) && if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) && (tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) &&
(0 == strncasecmp( (0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) { superTblInfo->dataSource, "sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n", verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos); __func__, __LINE__, pThreadInfo->samplePos);
} }
} // tableSeq } // tableSeq
free_of_progressive: free_of_progressive:
tmfree(pThreadInfo->buffer); tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo); printStatPerThread(pThreadInfo);
return NULL; return NULL;
} }
static void* syncWrite(void *sarg) { static void* syncWrite(void *sarg) {
...@@ -6759,7 +6852,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -6759,7 +6852,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
exit(-1); exit(-1);
} }
char buffer[3000]; char buffer[BUFFER_SIZE];
char *pstr = buffer; char *pstr = buffer;
if ((superTblInfo) if ((superTblInfo)
...@@ -8155,55 +8248,55 @@ static int isCommentLine(char *line) { ...@@ -8155,55 +8248,55 @@ static int isCommentLine(char *line) {
static void querySqlFile(TAOS* taos, char* sqlFile) static void querySqlFile(TAOS* taos, char* sqlFile)
{ {
FILE *fp = fopen(sqlFile, "r"); FILE *fp = fopen(sqlFile, "r");
if (fp == NULL) { if (fp == NULL) {
printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno)); printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno));
return; return;
} }
int read_len = 0; int read_len = 0;
char * cmd = calloc(1, TSDB_MAX_BYTES_PER_ROW); char * cmd = calloc(1, TSDB_MAX_BYTES_PER_ROW);
size_t cmd_len = 0; size_t cmd_len = 0;
char * line = NULL; char * line = NULL;
size_t line_len = 0; size_t line_len = 0;
double t = taosGetTimestampMs(); double t = taosGetTimestampMs();
while((read_len = tgetline(&line, &line_len, fp)) != -1) { while((read_len = tgetline(&line, &line_len, fp)) != -1) {
if (read_len >= TSDB_MAX_BYTES_PER_ROW) continue; if (read_len >= TSDB_MAX_BYTES_PER_ROW) continue;
line[--read_len] = '\0'; line[--read_len] = '\0';
if (read_len == 0 || isCommentLine(line)) { // line starts with # if (read_len == 0 || isCommentLine(line)) { // line starts with #
continue; continue;
} }
if (line[read_len - 1] == '\\') { if (line[read_len - 1] == '\\') {
line[read_len - 1] = ' '; line[read_len - 1] = ' ';
memcpy(cmd + cmd_len, line, read_len); memcpy(cmd + cmd_len, line, read_len);
cmd_len += read_len; cmd_len += read_len;
continue; continue;
} }
memcpy(cmd + cmd_len, line, read_len); memcpy(cmd + cmd_len, line, read_len);
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE, false)) {
errorPrint("%s() LN%d, queryDbExec %s failed!\n", errorPrint("%s() LN%d, queryDbExec %s failed!\n",
__func__, __LINE__, cmd); __func__, __LINE__, cmd);
tmfree(cmd); tmfree(cmd);
tmfree(line); tmfree(line);
tmfclose(fp); tmfclose(fp);
return; return;
}
memset(cmd, 0, TSDB_MAX_BYTES_PER_ROW);
cmd_len = 0;
} }
memset(cmd, 0, TSDB_MAX_BYTES_PER_ROW);
cmd_len = 0;
}
t = taosGetTimestampMs() - t; t = taosGetTimestampMs() - t;
printf("run %s took %.6f second(s)\n\n", sqlFile, t); printf("run %s took %.6f second(s)\n\n", sqlFile, t);
tmfree(cmd); tmfree(cmd);
tmfree(line); tmfree(line);
tmfclose(fp); tmfclose(fp);
return; return;
} }
static void testMetaFile() { static void testMetaFile() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册