未验证 提交 c41d23da 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

copy from develop branch. stmt: 0, nanosec: 0 (#6955)

* copy from develop branch. stmt: 0, nanosec: 0

* set thread name disabled on master branch.
上级 d9cc1eb7
......@@ -55,6 +55,11 @@
#define STMT_IFACE_ENABLED 0
#define NANO_SECOND_ENABLED 0
#define SET_THREADNAME_ENABLED 0
#if SET_THREADNAME_ENABLED == 0
#define setThreadName(name)
#endif
#define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096
......@@ -90,7 +95,7 @@ extern char configDir[];
#define MAX_SUPER_TABLE_COUNT 200
#define MAX_QUERY_SQL_COUNT 100
#define MAX_QUERY_SQL_LENGTH 1024
#define MAX_QUERY_SQL_LENGTH BUFFER_SIZE
#define MAX_DATABASE_COUNT 256
#define INPUT_BUF_LEN 256
......@@ -2367,7 +2372,25 @@ static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
return dataBuf;
}
static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) {
static char *generateBinaryNCharTagValues(int64_t tableSeq, uint32_t len)
{
char* buf = (char*)calloc(len, 1);
if (NULL == buf) {
printf("calloc failed! size:%d\n", len);
return NULL;
}
if (tableSeq % 2) {
tstrncpy(buf, "beijing", len);
} else {
tstrncpy(buf, "shanghai", len);
}
//rand_string(buf, stbInfo->tags[i].dataLen);
return buf;
}
static char* generateTagValuesForStb(SSuperTable* stbInfo, int64_t tableSeq) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) {
printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1);
......@@ -2388,20 +2411,12 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) {
return NULL;
}
int tagBufLen = stbInfo->tags[i].dataLen + 1;
char* buf = (char*)calloc(tagBufLen, 1);
int32_t tagBufLen = stbInfo->tags[i].dataLen + 1;
char *buf = generateBinaryNCharTagValues(tableSeq, tagBufLen);
if (NULL == buf) {
printf("calloc failed! size:%d\n", stbInfo->tags[i].dataLen);
tmfree(dataBuf);
return NULL;
}
if (tableSeq % 2) {
tstrncpy(buf, "beijing", tagBufLen);
} else {
tstrncpy(buf, "shanghai", tagBufLen);
}
//rand_string(buf, stbInfo->tags[i].dataLen);
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"\'%s\',", buf);
tmfree(buf);
......@@ -2410,11 +2425,11 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) {
if ((g_args.demo_mode) && (i == 0)) {
dataLen += snprintf(dataBuf + dataLen,
TSDB_MAX_SQL_LEN - dataLen,
"%d,", tableSeq % 10);
"%"PRId64",", tableSeq % 10);
} else {
dataLen += snprintf(dataBuf + dataLen,
TSDB_MAX_SQL_LEN - dataLen,
"%d,", tableSeq);
"%"PRId64",", tableSeq);
}
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"bigint", strlen("bigint"))) {
......@@ -2445,7 +2460,7 @@ static char* generateTagValuesForStb(SSuperTable* stbInfo, int32_t tableSeq) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%"PRId64",", rand_bigint());
} else {
printf("No support data type: %s\n", stbInfo->tags[i].dataType);
errorPrint("No support data type: %s\n", stbInfo->tags[i].dataType);
tmfree(dataBuf);
return NULL;
}
......@@ -2734,7 +2749,7 @@ static int createSuperTable(
} else if (strcasecmp(dataType, "INT") == 0) {
if ((g_args.demo_mode) && (colIndex == 1)) {
len += snprintf(cols + len, COL_BUFFER_LEN - len,
",VOLTAGE INT");
", VOLTAGE INT");
} else {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "INT");
}
......@@ -2756,9 +2771,9 @@ static int createSuperTable(
} else if (strcasecmp(dataType, "FLOAT") == 0) {
if (g_args.demo_mode) {
if (colIndex == 0) {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",CURRENT FLOAT");
len += snprintf(cols + len, COL_BUFFER_LEN - len, ", CURRENT FLOAT");
} else if (colIndex == 2) {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",PHASE FLOAT");
len += snprintf(cols + len, COL_BUFFER_LEN - len, ", PHASE FLOAT");
}
} else {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "FLOAT");
......@@ -3025,10 +3040,11 @@ static void* createTable(void *sarg)
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
setThreadName("createTable");
uint64_t lastPrintTime = taosGetTimestampMs();
int buff_len;
buff_len = BUFFER_SIZE;
int buff_len = BUFFER_SIZE;
pThreadInfo->buffer = calloc(buff_len, 1);
if (pThreadInfo->buffer == NULL) {
......@@ -4275,577 +4291,577 @@ PARSE_OVER:
}
static bool getMetaFromQueryJsonFile(cJSON* root) {
bool ret = false;
cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir");
if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) {
tstrncpy(g_queryInfo.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN);
}
cJSON* host = cJSON_GetObjectItem(root, "host");
if (host && host->type == cJSON_String && host->valuestring != NULL) {
tstrncpy(g_queryInfo.host, host->valuestring, MAX_HOSTNAME_SIZE);
} else if (!host) {
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
} else {
printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
}
cJSON* port = cJSON_GetObjectItem(root, "port");
if (port && port->type == cJSON_Number) {
g_queryInfo.port = port->valueint;
} else if (!port) {
g_queryInfo.port = 6030;
}
cJSON* user = cJSON_GetObjectItem(root, "user");
if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE);
} else if (!user) {
tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ;
}
cJSON* password = cJSON_GetObjectItem(root, "password");
if (password && password->type == cJSON_String && password->valuestring != NULL) {
tstrncpy(g_queryInfo.password, password->valuestring, MAX_PASSWORD_SIZE);
} else if (!password) {
tstrncpy(g_queryInfo.password, "taosdata", MAX_PASSWORD_SIZE);;
}
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt && answerPrompt->type == cJSON_String
&& answerPrompt->valuestring != NULL) {
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
g_args.answer_yes = false;
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
g_args.answer_yes = true;
} else {
g_args.answer_yes = false;
}
} else if (!answerPrompt) {
g_args.answer_yes = false;
} else {
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
goto PARSE_OVER;
}
cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times");
if (gQueryTimes && gQueryTimes->type == cJSON_Number) {
if (gQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.query_times = gQueryTimes->valueint;
} else if (!gQueryTimes) {
g_args.query_times = 1;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) {
tstrncpy(g_queryInfo.dbName, dbs->valuestring, TSDB_DB_NAME_LEN);
} else if (!dbs) {
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
}
cJSON* queryMode = cJSON_GetObjectItem(root, "query_mode");
if (queryMode && queryMode->type == cJSON_String && queryMode->valuestring != NULL) {
tstrncpy(g_queryInfo.queryMode, queryMode->valuestring, MAX_TB_NAME_SIZE);
} else if (!queryMode) {
tstrncpy(g_queryInfo.queryMode, "taosc", MAX_TB_NAME_SIZE);
} else {
printf("ERROR: failed to read json, query_mode not found\n");
goto PARSE_OVER;
}
bool ret = false;
// specified_table_query
cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query");
if (!specifiedQuery) {
g_queryInfo.specifiedQueryInfo.concurrent = 1;
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (specifiedQuery->type != cJSON_Object) {
printf("ERROR: failed to read json, super_table_query not found\n");
goto PARSE_OVER;
} else {
cJSON* queryInterval = cJSON_GetObjectItem(specifiedQuery, "query_interval");
if (queryInterval && queryInterval->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.queryInterval = queryInterval->valueint;
} else if (!queryInterval) {
g_queryInfo.specifiedQueryInfo.queryInterval = 0;
cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir");
if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) {
tstrncpy(g_queryInfo.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN);
}
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
"query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
if (specifiedQueryTimes->valueint <= 0) {
errorPrint(
"%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__, specifiedQueryTimes->valueint);
goto PARSE_OVER;
}
g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint;
} else if (!specifiedQueryTimes) {
g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times;
cJSON* host = cJSON_GetObjectItem(root, "host");
if (host && host->type == cJSON_String && host->valuestring != NULL) {
tstrncpy(g_queryInfo.host, host->valuestring, MAX_HOSTNAME_SIZE);
} else if (!host) {
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) {
if (concurrent->valueint <= 0) {
errorPrint(
"%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
goto PARSE_OVER;
}
g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
} else if (!concurrent) {
g_queryInfo.specifiedQueryInfo.concurrent = 1;
}
cJSON* specifiedAsyncMode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (specifiedAsyncMode && specifiedAsyncMode->type == cJSON_String
&& specifiedAsyncMode->valuestring != NULL) {
if (0 == strcmp("sync", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
} else if (0 == strcmp("async", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.asyncMode = ASYNC_MODE;
} else {
errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__);
printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
}
cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
if (interval && interval->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.subscribeInterval = interval->valueint;
} else if (!interval) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.subscribeInterval = 10000;
cJSON* port = cJSON_GetObjectItem(root, "port");
if (port && port->type == cJSON_Number) {
g_queryInfo.port = port->valueint;
} else if (!port) {
g_queryInfo.port = 6030;
}
cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart");
if (restart && restart->type == cJSON_String && restart->valuestring != NULL) {
if (0 == strcmp("yes", restart->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", restart->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
cJSON* user = cJSON_GetObjectItem(root, "user");
if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE);
} else if (!user) {
tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ;
}
cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress");
if (keepProgress
&& keepProgress->type == cJSON_String
&& keepProgress->valuestring != NULL) {
if (0 == strcmp("yes", keepProgress->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", keepProgress->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe keepProgress error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
cJSON* password = cJSON_GetObjectItem(root, "password");
if (password && password->type == cJSON_String && password->valuestring != NULL) {
tstrncpy(g_queryInfo.password, password->valuestring, MAX_PASSWORD_SIZE);
} else if (!password) {
tstrncpy(g_queryInfo.password, "taosdata", MAX_PASSWORD_SIZE);;
}
// sqls
cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls");
if (!specifiedSqls) {
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (specifiedSqls->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(specifiedSqls);
if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent
> MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n",
__func__, __LINE__,
superSqlSize,
g_queryInfo.specifiedQueryInfo.concurrent,
MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
// default value is -1, which mean infinite loop
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
cJSON* endAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "endAfterConsume");
if (endAfterConsume
&& endAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.endAfterConsume[j]
= endAfterConsume->valueint;
}
if (g_queryInfo.specifiedQueryInfo.endAfterConsume[j] < -1)
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if ((resubAfterConsume)
&& (resubAfterConsume->type == cJSON_Number)
&& (resubAfterConsume->valueint >= 0)) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
}
if (g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] < -1)
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
cJSON *result = cJSON_GetObjectItem(sql, "result");
if ((NULL != result) && (result->type == cJSON_String)
&& (result->valuestring != NULL)) {
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
memset(g_queryInfo.specifiedQueryInfo.result[j],
0, MAX_FILE_NAME_LEN);
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt && answerPrompt->type == cJSON_String
&& answerPrompt->valuestring != NULL) {
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
g_args.answer_yes = false;
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
g_args.answer_yes = true;
} else {
printf("ERROR: failed to read json, super query result file not found\n");
goto PARSE_OVER;
g_args.answer_yes = false;
}
}
}
}
// super_table_query
cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query");
if (!superQuery) {
g_queryInfo.superQueryInfo.threadCnt = 1;
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (superQuery->type != cJSON_Object) {
printf("ERROR: failed to read json, sub_table_query not found\n");
ret = true;
goto PARSE_OVER;
} else {
cJSON* subrate = cJSON_GetObjectItem(superQuery, "query_interval");
if (subrate && subrate->type == cJSON_Number) {
g_queryInfo.superQueryInfo.queryInterval = subrate->valueint;
} else if (!subrate) {
g_queryInfo.superQueryInfo.queryInterval = 0;
}
cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times");
if (superQueryTimes && superQueryTimes->type == cJSON_Number) {
if (superQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__, superQueryTimes->valueint);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint;
} else if (!superQueryTimes) {
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
} else if (!answerPrompt) {
g_args.answer_yes = false;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* threads = cJSON_GetObjectItem(superQuery, "threads");
if (threads && threads->type == cJSON_Number) {
if (threads->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, threads input mistake\n",
__func__, __LINE__);
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.threadCnt = threads->valueint;
} else if (!threads) {
g_queryInfo.superQueryInfo.threadCnt = 1;
}
//cJSON* subTblCnt = cJSON_GetObjectItem(superQuery, "childtable_count");
//if (subTblCnt && subTblCnt->type == cJSON_Number) {
// g_queryInfo.superQueryInfo.childTblCount = subTblCnt->valueint;
//} else if (!subTblCnt) {
// g_queryInfo.superQueryInfo.childTblCount = 0;
//}
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
if (stblname && stblname->type == cJSON_String
&& stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
TSDB_TABLE_NAME_LEN);
cJSON* gQueryTimes = cJSON_GetObjectItem(root, "query_times");
if (gQueryTimes && gQueryTimes->type == cJSON_Number) {
if (gQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.query_times = gQueryTimes->valueint;
} else if (!gQueryTimes) {
g_args.query_times = 1;
} else {
errorPrint("%s() LN%d, failed to read json, super table name input error\n",
__func__, __LINE__);
goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* superAsyncMode = cJSON_GetObjectItem(superQuery, "mode");
if (superAsyncMode && superAsyncMode->type == cJSON_String
&& superAsyncMode->valuestring != NULL) {
if (0 == strcmp("sync", superAsyncMode->valuestring)) {
g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE;
} else if (0 == strcmp("async", superAsyncMode->valuestring)) {
g_queryInfo.superQueryInfo.asyncMode = ASYNC_MODE;
} else {
errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__);
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) {
tstrncpy(g_queryInfo.dbName, dbs->valuestring, TSDB_DB_NAME_LEN);
} else if (!dbs) {
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE;
}
cJSON* superInterval = cJSON_GetObjectItem(superQuery, "interval");
if (superInterval && superInterval->type == cJSON_Number) {
if (superInterval->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.subscribeInterval = superInterval->valueint;
} else if (!superInterval) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.superQueryInfo.subscribeInterval = 10000;
}
cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart");
if (subrestart && subrestart->type == cJSON_String
&& subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
cJSON* queryMode = cJSON_GetObjectItem(root, "query_mode");
if (queryMode && queryMode->type == cJSON_String && queryMode->valuestring != NULL) {
tstrncpy(g_queryInfo.queryMode, queryMode->valuestring, MAX_TB_NAME_SIZE);
} else if (!queryMode) {
tstrncpy(g_queryInfo.queryMode, "taosc", MAX_TB_NAME_SIZE);
} else {
g_queryInfo.superQueryInfo.subscribeRestart = true;
printf("ERROR: failed to read json, query_mode not found\n");
goto PARSE_OVER;
}
cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
if (superkeepProgress &&
superkeepProgress->type == cJSON_String
&& superkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe super table keepProgress error\n");
// specified_table_query
cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query");
if (!specifiedQuery) {
g_queryInfo.specifiedQueryInfo.concurrent = 1;
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (specifiedQuery->type != cJSON_Object) {
printf("ERROR: failed to read json, super_table_query not found\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
}
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.endAfterConsume = -1;
cJSON* superEndAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume");
if (superEndAfterConsume
&& superEndAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume =
superEndAfterConsume->valueint;
}
if (g_queryInfo.superQueryInfo.endAfterConsume < -1)
g_queryInfo.superQueryInfo.endAfterConsume = -1;
cJSON* queryInterval = cJSON_GetObjectItem(specifiedQuery, "query_interval");
if (queryInterval && queryInterval->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.queryInterval = queryInterval->valueint;
} else if (!queryInterval) {
g_queryInfo.specifiedQueryInfo.queryInterval = 0;
}
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "resubAfterConsume");
if ((superResubAfterConsume)
&& (superResubAfterConsume->type == cJSON_Number)
&& (superResubAfterConsume->valueint >= 0)) {
g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint;
}
if (g_queryInfo.superQueryInfo.resubAfterConsume < -1)
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
"query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
if (specifiedQueryTimes->valueint <= 0) {
errorPrint(
"%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__, specifiedQueryTimes->valueint);
goto PARSE_OVER;
// supert table sqls
cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!superSqls) {
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) {
errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
}
g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint;
} else if (!specifiedQueryTimes) {
g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
if (sql == NULL) continue;
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) {
if (concurrent->valueint <= 0) {
errorPrint(
"%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
goto PARSE_OVER;
}
g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
} else if (!concurrent) {
g_queryInfo.specifiedQueryInfo.concurrent = 1;
}
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String
|| sqlStr->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, sql not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH);
cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){
tstrncpy(g_queryInfo.superQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, sub query result file not found\n",
__func__, __LINE__);
goto PARSE_OVER;
cJSON* specifiedAsyncMode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (specifiedAsyncMode && specifiedAsyncMode->type == cJSON_String
&& specifiedAsyncMode->valuestring != NULL) {
if (0 == strcmp("sync", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
} else if (0 == strcmp("async", specifiedAsyncMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.asyncMode = ASYNC_MODE;
} else {
errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.asyncMode = SYNC_MODE;
}
}
}
}
ret = true;
cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
if (interval && interval->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.subscribeInterval = interval->valueint;
} else if (!interval) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.subscribeInterval = 10000;
}
PARSE_OVER:
return ret;
}
cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart");
if (restart && restart->type == cJSON_String && restart->valuestring != NULL) {
if (0 == strcmp("yes", restart->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", restart->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
}
static bool getInfoFromJsonFile(char* file) {
debugPrint("%s %d %s\n", __func__, __LINE__, file);
cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress");
if (keepProgress
&& keepProgress->type == cJSON_String
&& keepProgress->valuestring != NULL) {
if (0 == strcmp("yes", keepProgress->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", keepProgress->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe keepProgress error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress = 0;
}
FILE *fp = fopen(file, "r");
if (!fp) {
printf("failed to read %s, reason:%s\n", file, strerror(errno));
return false;
}
// sqls
cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls");
if (!specifiedSqls) {
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (specifiedSqls->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(specifiedSqls);
if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent
> MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n",
__func__, __LINE__,
superSqlSize,
g_queryInfo.specifiedQueryInfo.concurrent,
MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
bool ret = false;
int maxLen = 6400000;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
printf("failed to read %s, content is null", file);
return false;
}
g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j);
if (sql == NULL) continue;
content[len] = 0;
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
printf("ERROR: failed to cjson parse %s, invalid json format\n", file);
goto PARSE_OVER;
}
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
// default value is -1, which mean infinite loop
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
cJSON* endAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "endAfterConsume");
if (endAfterConsume
&& endAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.endAfterConsume[j]
= endAfterConsume->valueint;
}
if (g_queryInfo.specifiedQueryInfo.endAfterConsume[j] < -1)
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if ((resubAfterConsume)
&& (resubAfterConsume->type == cJSON_Number)
&& (resubAfterConsume->valueint >= 0)) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
}
cJSON* filetype = cJSON_GetObjectItem(root, "filetype");
if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) {
if (0 == strcasecmp("insert", filetype->valuestring)) {
g_args.test_mode = INSERT_TEST;
} else if (0 == strcasecmp("query", filetype->valuestring)) {
g_args.test_mode = QUERY_TEST;
} else if (0 == strcasecmp("subscribe", filetype->valuestring)) {
g_args.test_mode = SUBSCRIBE_TEST;
} else {
printf("ERROR: failed to read json, filetype not support\n");
goto PARSE_OVER;
if (g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] < -1)
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
cJSON *result = cJSON_GetObjectItem(sql, "result");
if ((NULL != result) && (result->type == cJSON_String)
&& (result->valuestring != NULL)) {
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
memset(g_queryInfo.specifiedQueryInfo.result[j],
0, MAX_FILE_NAME_LEN);
} else {
printf("ERROR: failed to read json, super query result file not found\n");
goto PARSE_OVER;
}
}
}
}
} else if (!filetype) {
g_args.test_mode = INSERT_TEST;
} else {
printf("ERROR: failed to read json, filetype not found\n");
goto PARSE_OVER;
}
if (INSERT_TEST == g_args.test_mode) {
ret = getMetaFromInsertJsonFile(root);
} else if ((QUERY_TEST == g_args.test_mode)
|| (SUBSCRIBE_TEST == g_args.test_mode)) {
ret = getMetaFromQueryJsonFile(root);
} else {
errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n",
__func__, __LINE__);
goto PARSE_OVER;
}
// super_table_query
cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query");
if (!superQuery) {
g_queryInfo.superQueryInfo.threadCnt = 1;
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (superQuery->type != cJSON_Object) {
printf("ERROR: failed to read json, sub_table_query not found\n");
ret = true;
goto PARSE_OVER;
} else {
cJSON* subrate = cJSON_GetObjectItem(superQuery, "query_interval");
if (subrate && subrate->type == cJSON_Number) {
g_queryInfo.superQueryInfo.queryInterval = subrate->valueint;
} else if (!subrate) {
g_queryInfo.superQueryInfo.queryInterval = 0;
}
cJSON* superQueryTimes = cJSON_GetObjectItem(superQuery, "query_times");
if (superQueryTimes && superQueryTimes->type == cJSON_Number) {
if (superQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__, superQueryTimes->valueint);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.queryTimes = superQueryTimes->valueint;
} else if (!superQueryTimes) {
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
PARSE_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
cJSON* threads = cJSON_GetObjectItem(superQuery, "threads");
if (threads && threads->type == cJSON_Number) {
if (threads->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, threads input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
static int prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) {
if (readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]) != 0) {
return -1;
}
g_queryInfo.superQueryInfo.threadCnt = threads->valueint;
} else if (!threads) {
g_queryInfo.superQueryInfo.threadCnt = 1;
}
}
}
}
return 0;
}
//cJSON* subTblCnt = cJSON_GetObjectItem(superQuery, "childtable_count");
//if (subTblCnt && subTblCnt->type == cJSON_Number) {
// g_queryInfo.superQueryInfo.childTblCount = subTblCnt->valueint;
//} else if (!subTblCnt) {
// g_queryInfo.superQueryInfo.childTblCount = 0;
//}
static void postFreeResource() {
tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
}
if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
}
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf);
g_Dbs.db[i].superTbls[j].tagDataBuf = NULL;
}
if (0 != g_Dbs.db[i].superTbls[j].childTblName) {
free(g_Dbs.db[i].superTbls[j].childTblName);
g_Dbs.db[i].superTbls[j].childTblName = NULL;
}
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
if (stblname && stblname->type == cJSON_String
&& stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
TSDB_TABLE_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, super table name input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* superAsyncMode = cJSON_GetObjectItem(superQuery, "mode");
if (superAsyncMode && superAsyncMode->type == cJSON_String
&& superAsyncMode->valuestring != NULL) {
if (0 == strcmp("sync", superAsyncMode->valuestring)) {
g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE;
} else if (0 == strcmp("async", superAsyncMode->valuestring)) {
g_queryInfo.superQueryInfo.asyncMode = ASYNC_MODE;
} else {
errorPrint("%s() LN%d, failed to read json, async mode input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.asyncMode = SYNC_MODE;
}
cJSON* superInterval = cJSON_GetObjectItem(superQuery, "interval");
if (superInterval && superInterval->type == cJSON_Number) {
if (superInterval->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.subscribeInterval = superInterval->valueint;
} else if (!superInterval) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.superQueryInfo.subscribeInterval = 10000;
}
cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart");
if (subrestart && subrestart->type == cJSON_String
&& subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.subscribeRestart = true;
}
cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
if (superkeepProgress &&
superkeepProgress->type == cJSON_String
&& superkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe super table keepProgress error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
}
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.endAfterConsume = -1;
cJSON* superEndAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume");
if (superEndAfterConsume
&& superEndAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume =
superEndAfterConsume->valueint;
}
if (g_queryInfo.superQueryInfo.endAfterConsume < -1)
g_queryInfo.superQueryInfo.endAfterConsume = -1;
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "resubAfterConsume");
if ((superResubAfterConsume)
&& (superResubAfterConsume->type == cJSON_Number)
&& (superResubAfterConsume->valueint >= 0)) {
g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint;
}
if (g_queryInfo.superQueryInfo.resubAfterConsume < -1)
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
// supert table sqls
cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!superSqls) {
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) {
errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String
|| sqlStr->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, sql not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH);
cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){
tstrncpy(g_queryInfo.superQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, sub query result file not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
}
}
}
ret = true;
PARSE_OVER:
return ret;
}
static bool getInfoFromJsonFile(char* file) {
debugPrint("%s %d %s\n", __func__, __LINE__, file);
FILE *fp = fopen(file, "r");
if (!fp) {
printf("failed to read %s, reason:%s\n", file, strerror(errno));
return false;
}
bool ret = false;
int maxLen = 6400000;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
printf("failed to read %s, content is null", file);
return false;
}
content[len] = 0;
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
printf("ERROR: failed to cjson parse %s, invalid json format\n", file);
goto PARSE_OVER;
}
cJSON* filetype = cJSON_GetObjectItem(root, "filetype");
if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) {
if (0 == strcasecmp("insert", filetype->valuestring)) {
g_args.test_mode = INSERT_TEST;
} else if (0 == strcasecmp("query", filetype->valuestring)) {
g_args.test_mode = QUERY_TEST;
} else if (0 == strcasecmp("subscribe", filetype->valuestring)) {
g_args.test_mode = SUBSCRIBE_TEST;
} else {
printf("ERROR: failed to read json, filetype not support\n");
goto PARSE_OVER;
}
} else if (!filetype) {
g_args.test_mode = INSERT_TEST;
} else {
printf("ERROR: failed to read json, filetype not found\n");
goto PARSE_OVER;
}
if (INSERT_TEST == g_args.test_mode) {
ret = getMetaFromInsertJsonFile(root);
} else if ((QUERY_TEST == g_args.test_mode)
|| (SUBSCRIBE_TEST == g_args.test_mode)) {
ret = getMetaFromQueryJsonFile(root);
} else {
errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n",
__func__, __LINE__);
goto PARSE_OVER;
}
PARSE_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static int prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) {
if (readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]) != 0) {
return -1;
}
}
}
}
return 0;
}
static void postFreeResource() {
tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
}
if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
}
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf);
g_Dbs.db[i].superTbls[j].tagDataBuf = NULL;
}
if (0 != g_Dbs.db[i].superTbls[j].childTblName) {
free(g_Dbs.db[i].superTbls[j].childTblName);
g_Dbs.db[i].superTbls[j].childTblName = NULL;
}
}
}
}
}
static int getRowDataFromSample(
......@@ -5033,30 +5049,30 @@ static int64_t generateData(char *recBuf, char **data_type,
}
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
char* sampleDataBuf = NULL;
char* sampleDataBuf = NULL;
sampleDataBuf = calloc(
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return -1;
}
if (sampleDataBuf == NULL) {
errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return -1;
}
superTblInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo);
superTblInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
errorPrint("%s() LN%d, read sample from csv file failed.\n",
__func__, __LINE__);
tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
}
if (0 != ret) {
errorPrint("%s() LN%d, read sample from csv file failed.\n",
__func__, __LINE__);
tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
}
return 0;
return 0;
}
static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
......@@ -5155,54 +5171,54 @@ static int32_t generateDataTailWithoutStb(
uint64_t recordFrom, int64_t startTime,
/* int64_t *pSamplePos, */int64_t *dataLen) {
uint64_t len = 0;
char *pstr = buffer;
uint64_t len = 0;
char *pstr = buffer;
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
int32_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
int32_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0;
int64_t retLen = 0;
char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary;
char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary;
if (g_args.disorderRatio) {
retLen = generateData(data, data_type,
startTime + getTSRandTail(
(int64_t) DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange),
lenOfBinary);
} else {
retLen = generateData(data, data_type,
startTime + (int64_t) (DEFAULT_TIMESTAMP_STEP* k),
lenOfBinary);
}
if (g_args.disorderRatio) {
retLen = generateData(data, data_type,
startTime + getTSRandTail(
(int64_t) DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange),
lenOfBinary);
} else {
retLen = generateData(data, data_type,
startTime + (int64_t) (DEFAULT_TIMESTAMP_STEP* k),
lenOfBinary);
}
if (len > remainderBufLen)
break;
if (len > remainderBufLen)
break;
pstr += sprintf(pstr, "%s", data);
k++;
len += retLen;
remainderBufLen -= retLen;
pstr += sprintf(pstr, "%s", data);
k++;
len += retLen;
remainderBufLen -= retLen;
verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
recordFrom ++;
recordFrom ++;
if (recordFrom >= insertRows) {
break;
if (recordFrom >= insertRows) {
break;
}
}
}
*dataLen = len;
return k;
*dataLen = len;
return k;
}
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
......@@ -5297,82 +5313,82 @@ static int generateSQLHeadWithoutStb(char *tableName,
char *dbName,
char *buffer, int remainderBufLen)
{
int len;
int len;
char headBuf[HEAD_BUFF_LEN];
char headBuf[HEAD_BUFF_LEN];
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
if (len > remainderBufLen)
return -1;
if (len > remainderBufLen)
return -1;
tstrncpy(buffer, headBuf, len + 1);
tstrncpy(buffer, headBuf, len + 1);
return len;
return len;
}
static int generateStbSQLHead(
SSuperTable* superTblInfo,
char *tableName, int32_t tableSeq,
char *tableName, int64_t tableSeq,
char *dbName,
char *buffer, int remainderBufLen)
{
int len;
int len;
char headBuf[HEAD_BUFF_LEN];
char headBuf[HEAD_BUFF_LEN];
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq);
} else {
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
tableSeq % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
}
if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s using %s.%s TAGS%s values",
dbName,
tableName,
dbName,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s using %s.%s TAGS%s values",
dbName,
tableName,
dbName,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
} else {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
}
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
}
if (len > remainderBufLen)
return -1;
if (len > remainderBufLen)
return -1;
tstrncpy(buffer, headBuf, len + 1);
tstrncpy(buffer, headBuf, len + 1);
return len;
return len;
}
static int32_t generateStbInterlaceData(
......@@ -5650,8 +5666,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
*ptr += bind->buffer_length;
} else {
errorPrint( "No support data type: %s\n",
dataType);
errorPrint( "No support data type: %s\n", dataType);
return -1;
}
......@@ -5737,28 +5752,120 @@ static int32_t prepareStmtWithoutStb(
return k;
}
static int32_t prepareStbStmtBind(
char *bindArray, SSuperTable *stbInfo, bool sourceRand,
int64_t startTime, int32_t recSeq,
bool isColumn)
{
char *bindBuffer = calloc(1, g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind;
if (isColumn) {
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
if (i == 0) {
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else {
int cursor = 0;
if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
NULL)) {
free(bindBuffer);
return -1;
}
} else {
char *restStr = stbInfo->sampleDataBuf + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
memset(bindBuffer, 0, g_args.len_of_binary);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
bindBuffer)) {
free(bindBuffer);
return -1;
}
}
}
}
} else {
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) {
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
&ptr,
NULL)) {
free(bindBuffer);
return -1;
}
}
}
return 0;
}
static int32_t prepareStbStmt(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
tableName, ret, taos_errstr(NULL));
return ret;
}
char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
int ret;
bool sourceRand;
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
......@@ -5767,83 +5874,68 @@ static int32_t prepareStbStmt(
sourceRand = false; // from sample data file
}
char *bindBuffer = malloc(g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
free(bindArray);
return -1;
}
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
uint32_t k;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
bool tagRand;
if (0 == stbInfo->tagSource) {
tagRand = true;
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagRand = false;
tagsValBuf = getTagValueFromTagSample(
stbInfo,
tableSeq % stbInfo->tagSampleCount);
}
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
int64_t *bind_ts;
char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount);
if (NULL == tagsArray) {
tmfree(tagsValBuf);
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * k;
if (-1 == prepareStbStmtBind(
tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) {
free(tagsArray);
return -1;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray);
int cursor = 0;
for (int i = 0; i < stbInfo->columnCount; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen,
&ptr,
NULL)) {
free(bindArray);
free(bindBuffer);
return -1;
}
} else {
char *restStr = stbInfo->sampleDataBuf + cursor;
int lengthOfRest = strlen(restStr);
tmfree(tagsValBuf);
tmfree((char *)tagsArray);
} else {
ret = taos_stmt_set_tbname(stmt, tableName);
}
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
tableName, ret, taos_errstr(NULL));
return ret;
}
memset(bindBuffer, 0, g_args.len_of_binary);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen,
&ptr,
bindBuffer)) {
free(bindArray);
free(bindBuffer);
return -1;
}
}
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
uint32_t k;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand,
startTime, k, true /* is column */)) {
free(bindArray);
return -1;
}
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
// if msg > 3MB, break
......@@ -5861,7 +5953,6 @@ static int32_t prepareStbStmt(
}
}
free(bindBuffer);
free(bindArray);
return k;
}
......@@ -5869,7 +5960,9 @@ static int32_t prepareStbStmt(
static int32_t prepareStbStmtInterlace(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
......@@ -5879,6 +5972,7 @@ static int32_t prepareStbStmtInterlace(
stbInfo,
stmt,
tableName,
tableSeq,
batch,
insertRows, 0, startTime,
pSamplePos);
......@@ -5887,7 +5981,9 @@ static int32_t prepareStbStmtInterlace(
static int32_t prepareStbStmtProgressive(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
......@@ -5897,6 +5993,7 @@ static int32_t prepareStbStmtProgressive(
stbInfo,
stmt,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, recordFrom, startTime,
pSamplePos);
......@@ -6097,6 +6194,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
superTblInfo,
pThreadInfo->stmt,
tableName,
tableSeq,
batchPerTbl,
insertRows, i,
startTime,
......@@ -6263,164 +6361,165 @@ free_of_interlace:
// sync insertion progressive data
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int64_t insertRows =
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int64_t insertRows =
(superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen,
strerror(errno));
return NULL;
}
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
pThreadInfo->samplePos = 0;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
for (uint64_t i = 0; i < insertRows;) {
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
__func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName);
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer);
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen,
strerror(errno));
return NULL;
}
}
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
pThreadInfo->samplePos = 0;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
for (uint64_t i = 0; i < insertRows;) {
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
__func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName);
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer);
return NULL;
}
int64_t remainderBufLen = maxSqlLen;
char *pstr = pThreadInfo->buffer;
int64_t remainderBufLen = maxSqlLen;
char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len;
remainderBufLen -= len;
pstr += len;
remainderBufLen -= len;
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtProgressive(
superTblInfo,
pThreadInfo->stmt,
tableName,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
generated = prepareStbStmtProgressive(
superTblInfo,
pThreadInfo->stmt,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
#else
generated = -1;
generated = -1;
#endif
} else {
generated = generateStbProgressiveData(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr,
insertRows, i, start_time,
&(pThreadInfo->samplePos),
&remainderBufLen);
}
} else {
if (g_args.iface == STMT_IFACE) {
} else {
generated = generateStbProgressiveData(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr,
insertRows, i, start_time,
&(pThreadInfo->samplePos),
&remainderBufLen);
}
} else {
if (g_args.iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt,
tableName,
g_args.num_of_RPR,
insertRows, i,
start_time);
generated = prepareStmtWithoutStb(
pThreadInfo->stmt,
tableName,
g_args.num_of_RPR,
insertRows, i,
start_time);
#else
generated = -1;
generated = -1;
#endif
} else {
generated = generateProgressiveDataWithoutStb(
tableName,
/* tableSeq, */
pThreadInfo, pstr, insertRows,
i, start_time,
/* &(pThreadInfo->samplePos), */
&remainderBufLen);
}
}
if (generated > 0)
i += generated;
else
goto free_of_progressive;
} else {
generated = generateProgressiveDataWithoutStb(
tableName,
/* tableSeq, */
pThreadInfo, pstr, insertRows,
i, start_time,
/* &(pThreadInfo->samplePos), */
&remainderBufLen);
}
}
if (generated > 0)
i += generated;
else
goto free_of_progressive;
start_time += generated * timeStampStep;
pThreadInfo->totalInsertRows += generated;
start_time += generated * timeStampStep;
pThreadInfo->totalInsertRows += generated;
startTs = taosGetTimestampMs();
startTs = taosGetTimestampMs();
int32_t affectedRows = execInsert(pThreadInfo, generated);
int32_t affectedRows = execInsert(pThreadInfo, generated);
endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
__func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
__func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
if (affectedRows < 0) {
errorPrint("%s() LN%d, affected rows: %d\n",
__func__, __LINE__, affectedRows);
goto free_of_progressive;
}
if (affectedRows < 0) {
errorPrint("%s() LN%d, affected rows: %d\n",
__func__, __LINE__, affectedRows);
goto free_of_progressive;
}
pThreadInfo->totalAffectedRows += affectedRows;
pThreadInfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
if (i >= insertRows)
break;
} // num_of_DPT
if (i >= insertRows)
break;
} // num_of_DPT
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) &&
(0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos);
}
} // tableSeq
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) &&
(0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos);
}
} // tableSeq
free_of_progressive:
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
}
static void* syncWrite(void *sarg) {
......@@ -6428,6 +6527,8 @@ static void* syncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
setThreadName("syncWrite");
uint32_t interlaceRows;
if (superTblInfo) {
......@@ -6513,6 +6614,8 @@ static void *asyncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
setThreadName("asyncWrite");
pThreadInfo->st = 0;
pThreadInfo->et = 0;
pThreadInfo->lastTs = pThreadInfo->start_time;
......@@ -6754,7 +6857,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
exit(-1);
}
char buffer[3000];
char buffer[BUFFER_SIZE];
char *pstr = buffer;
if ((superTblInfo)
......@@ -6913,6 +7016,7 @@ static void *readTable(void *sarg) {
#if 1
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
setThreadName("readTable");
char command[BUFFER_SIZE] = "\0";
uint64_t sTime = pThreadInfo->start_time;
char *tb_prefix = pThreadInfo->tb_prefix;
......@@ -6985,6 +7089,7 @@ static void *readMetric(void *sarg) {
#if 1
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
setThreadName("readMetric");
char command[BUFFER_SIZE] = "\0";
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
......@@ -7161,6 +7266,8 @@ static int insertTestProcess() {
static void *specifiedTableQuery(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
setThreadName("specTableQuery");
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
......@@ -7260,6 +7367,8 @@ static void *superTableQuery(void *sarg) {
char sqlstr[MAX_QUERY_SQL_LENGTH];
threadInfo *pThreadInfo = (threadInfo *)sarg;
setThreadName("superTableQuery");
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
......@@ -7562,6 +7671,8 @@ static void *superSubscribe(void *sarg) {
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq;
setThreadName("superSub");
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
pThreadInfo->ntables, MAX_QUERY_SQL_COUNT);
......@@ -7708,6 +7819,8 @@ static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
// TAOS_SUB* tsub = NULL;
setThreadName("specSub");
if (pThreadInfo->taos == NULL) {
pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
......@@ -8140,55 +8253,55 @@ static int isCommentLine(char *line) {
static void querySqlFile(TAOS* taos, char* sqlFile)
{
FILE *fp = fopen(sqlFile, "r");
if (fp == NULL) {
printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno));
return;
}
FILE *fp = fopen(sqlFile, "r");
if (fp == NULL) {
printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno));
return;
}
int read_len = 0;
char * cmd = calloc(1, TSDB_MAX_BYTES_PER_ROW);
size_t cmd_len = 0;
char * line = NULL;
size_t line_len = 0;
int read_len = 0;
char * cmd = calloc(1, TSDB_MAX_BYTES_PER_ROW);
size_t cmd_len = 0;
char * line = NULL;
size_t line_len = 0;
double t = taosGetTimestampMs();
double t = taosGetTimestampMs();
while((read_len = tgetline(&line, &line_len, fp)) != -1) {
if (read_len >= TSDB_MAX_BYTES_PER_ROW) continue;
line[--read_len] = '\0';
while((read_len = tgetline(&line, &line_len, fp)) != -1) {
if (read_len >= TSDB_MAX_BYTES_PER_ROW) continue;
line[--read_len] = '\0';
if (read_len == 0 || isCommentLine(line)) { // line starts with #
continue;
}
if (read_len == 0 || isCommentLine(line)) { // line starts with #
continue;
}
if (line[read_len - 1] == '\\') {
line[read_len - 1] = ' ';
memcpy(cmd + cmd_len, line, read_len);
cmd_len += read_len;
continue;
}
if (line[read_len - 1] == '\\') {
line[read_len - 1] = ' ';
memcpy(cmd + cmd_len, line, read_len);
cmd_len += read_len;
continue;
}
memcpy(cmd + cmd_len, line, read_len);
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE, false)) {
errorPrint("%s() LN%d, queryDbExec %s failed!\n",
__func__, __LINE__, cmd);
tmfree(cmd);
tmfree(line);
tmfclose(fp);
return;
memcpy(cmd + cmd_len, line, read_len);
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE, false)) {
errorPrint("%s() LN%d, queryDbExec %s failed!\n",
__func__, __LINE__, cmd);
tmfree(cmd);
tmfree(line);
tmfclose(fp);
return;
}
memset(cmd, 0, TSDB_MAX_BYTES_PER_ROW);
cmd_len = 0;
}
memset(cmd, 0, TSDB_MAX_BYTES_PER_ROW);
cmd_len = 0;
}
t = taosGetTimestampMs() - t;
printf("run %s took %.6f second(s)\n\n", sqlFile, t);
t = taosGetTimestampMs() - t;
printf("run %s took %.6f second(s)\n\n", sqlFile, t);
tmfree(cmd);
tmfree(line);
tmfclose(fp);
return;
tmfree(cmd);
tmfree(line);
tmfclose(fp);
return;
}
static void testMetaFile() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册