未验证 提交 7610e811 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 3317 taosdemo interlace (#5485)

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

check offset 0.

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

fix sample file import bug.

* [TD-3316] <fix>: add test case for limit and offset. fix sample data issue.

* [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file.

* [TD-3317] <feature>: make taosdemo support interlace mode.

json parameter rows_per_tbl support.

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode insertion.

refactor.

* [TD-3317] <feature>: support interlace mode insertion.

change json file.

* [TD-3317] <feature>: support interlace mode insertion.

fix multithread create table regression.

* [TD-3317] <feature>: support interlace mode insertion.

working but not perfect.

* [TD-3317] <feature>: support interlace mode insertion.

rename lowaTest with taosdemoTestWithJson

* [TD-3317] <feature>: support interlace mode insertion.

perfect

* [TD-3317] <feature>: support interlace mode insertion.

cleanup.

* [TD-3317] <feature>: support interlace mode insertion.

adjust algorithm of loop times.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 1835aaad
......@@ -567,12 +567,19 @@ static FILE * g_fpOfInsertResult = NULL;
#define debugPrint(fmt, ...) \
do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
#define errorPrint(fmt, ...) \
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
///////////////////////////////////////////////////
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
void printHelp() {
char indent[10] = " ";
printf("%s%s%s%s\n", indent, "-f", indent,
......@@ -645,7 +652,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (strcmp(argv[i], "-c") == 0) {
char *configPath = argv[++i];
if (wordexp(configPath, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", configPath);
errorPrint( "Invalid path %s\n", configPath);
return;
}
taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]);
......@@ -694,8 +701,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(argv[i], "DOUBLE")
&& strcasecmp(argv[i], "BINARY")
&& strcasecmp(argv[i], "NCHAR")) {
fprintf(stderr, "Invalid data_type!\n");
printHelp();
ERROR_EXIT( "Invalid data_type!\n");
exit(EXIT_FAILURE);
}
sptr[0] = argv[i];
......@@ -715,8 +722,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(token, "DOUBLE")
&& strcasecmp(token, "BINARY")
&& strcasecmp(token, "NCHAR")) {
fprintf(stderr, "Invalid data_type!\n");
printHelp();
ERROR_EXIT("Invalid data_type!\n");
exit(EXIT_FAILURE);
}
sptr[index++] = token;
......@@ -771,8 +778,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printHelp();
exit(0);
} else {
fprintf(stderr, "wrong options\n");
printHelp();
ERROR_EXIT("ERROR: wrong options\n");
exit(EXIT_FAILURE);
}
}
......@@ -858,7 +865,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
if (code != 0) {
debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res));
errorPrint( "Failed to run %s, reason: %s\n", command, taos_errstr(res));
taos_free_result(res);
//taos_close(taos);
return -1;
......@@ -884,13 +891,13 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
if (resultFileName[0] != 0) {
fp = fopen(resultFileName, "at");
if (fp == NULL) {
fprintf(stderr, "failed to open result file: %s, result will not save to file\n", resultFileName);
errorPrint("%s() LN%d, failed to open result file: %s, result will not save to file\n", __func__, __LINE__, resultFileName);
}
}
char* databuf = (char*) calloc(1, 100*1024*1024);
if (databuf == NULL) {
fprintf(stderr, "failed to malloc, warning: save result to file slowly!\n");
errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n", __func__, __LINE__);
if (fp)
fclose(fp);
return ;
......@@ -1484,7 +1491,7 @@ static int xDumpResultToFile(const char* fname, TAOS_RES* tres) {
FILE* fp = fopen(fname, "at");
if (fp == NULL) {
fprintf(stderr, "ERROR: failed to open file: %s\n", fname);
errorPrint("%s() LN%d, failed to open file: %s\n", __func__, __LINE__, fname);
return -1;
}
......@@ -1529,7 +1536,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
int32_t code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "failed to run <show databases>, reason: %s\n", taos_errstr(res));
errorPrint( "failed to run <show databases>, reason: %s\n", taos_errstr(res));
return -1;
}
......@@ -1541,7 +1548,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
dbInfos[count] = (SDbInfo *)calloc(1, sizeof(SDbInfo));
if (dbInfos[count] == NULL) {
fprintf(stderr, "failed to allocate memory for some dbInfo[%d]\n", count);
errorPrint( "failed to allocate memory for some dbInfo[%d]\n", count);
return -1;
}
......@@ -1576,7 +1583,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
count++;
if (count > MAX_DATABASE_COUNT) {
fprintf(stderr, "The database count overflow than %d\n", MAX_DATABASE_COUNT);
errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT);
break;
}
}
......@@ -1590,7 +1597,7 @@ static void printfDbInfoForQueryToFile(char* filename, SDbInfo* dbInfos, int ind
FILE *fp = fopen(filename, "at");
if (fp == NULL) {
fprintf(stderr, "failed to open file: %s\n", filename);
errorPrint( "failed to open file: %s\n", filename);
return;
}
......@@ -1646,7 +1653,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
res = taos_query(taos, "show databases;");
SDbInfo** dbInfos = (SDbInfo **)calloc(MAX_DATABASE_COUNT, sizeof(SDbInfo *));
if (dbInfos == NULL) {
fprintf(stderr, "failed to allocate memory\n");
errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__);
return;
}
int dbCount = getDbFromServer(taos, dbInfos);
......@@ -1676,8 +1683,6 @@ static void printfQuerySystemInfo(TAOS * taos) {
}
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
static int postProceSql(char* host, uint16_t port, char* sqlstr)
{
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
......@@ -1725,9 +1730,9 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
fprintf(stderr, "Could not create socket : %d" , WSAGetLastError());
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d sockfd=%d\n", __func__, __LINE__, sockfd);
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
free(request_buf);
ERROR_EXIT("ERROR opening socket");
}
......@@ -1847,7 +1852,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) {
printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1);
errorPrint("%s() LN%d, calloc failed! size:%d\n", __func__, __LINE__, TSDB_MAX_SQL_LEN+1);
return NULL;
}
......@@ -2140,7 +2145,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
int childTblCount = 10000;
superTbls->childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
if (superTbls->childTblName == NULL) {
fprintf(stderr, "alloc memory failed!");
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
return -1;
}
getAllChildNameOfSuperTable(taos, dbName,
......@@ -2279,7 +2284,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
fprintf(stderr, "create supertable %s failed!\n\n",
errorPrint( "create supertable %s failed!\n\n",
superTbls->sTblName);
return -1;
}
......@@ -2293,7 +2298,7 @@ static int createDatabases() {
int ret = 0;
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
return -1;
}
char command[BUFFER_SIZE] = "\0";
......@@ -2378,7 +2383,7 @@ static int createDatabases() {
debugPrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos);
fprintf(stderr, "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
return -1;
}
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
......@@ -2427,7 +2432,7 @@ static void* createTable(void *sarg)
char *buffer = calloc(buff_len, 1);
if (buffer == NULL) {
fprintf(stderr, "Memory allocated failed!");
errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__);
exit(-1);
}
......@@ -2485,7 +2490,7 @@ static void* createTable(void *sarg)
len = 0;
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
fprintf(stderr, "queryDbExec() failed. buffer:\n%s\n", buffer);
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
free(buffer);
return NULL;
}
......@@ -2501,7 +2506,7 @@ static void* createTable(void *sarg)
if (0 != len) {
verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)) {
fprintf(stderr, "queryDbExec() failed. buffer:\n%s\n", buffer);
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
}
}
......@@ -2546,7 +2551,7 @@ static int startMultiThreadCreateChildTable(
db_name,
g_Dbs.port);
if (t_info->taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
free(pids);
free(infos);
return -1;
......@@ -2724,7 +2729,7 @@ static int readSampleFromCsvFileToMem(
FILE* fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) {
fprintf(stderr, "Failed to open sample file: %s, reason:%s\n",
errorPrint( "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
return -1;
}
......@@ -2736,7 +2741,7 @@ static int readSampleFromCsvFileToMem(
readLen = tgetline(&line, &n, fp);
if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) {
fprintf(stderr, "Failed to fseek file: %s, reason:%s\n",
errorPrint( "Failed to fseek file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
fclose(fp);
return -1;
......@@ -2805,8 +2810,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
int columnSize = cJSON_GetArraySize(columns);
if (columnSize > MAX_COLUMN_COUNT) {
printf("ERROR: failed to read json, column size overflow, max column size is %d\n",
MAX_COLUMN_COUNT);
errorPrint("%s() LN%d, failed to read json, column size overflow, max column size is %d\n",
__func__, __LINE__, MAX_COLUMN_COUNT);
goto PARSE_OVER;
}
......@@ -2824,7 +2829,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) {
printf("ERROR: failed to read json, column count not found\n");
errorPrint("%s() LN%d, failed to read json, column count not found\n", __func__, __LINE__);
goto PARSE_OVER;
} else {
count = 1;
......@@ -2834,7 +2839,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
printf("ERROR: failed to read json, column type not found\n");
errorPrint("%s() LN%d: failed to read json, column type not found\n", __func__, __LINE__);
goto PARSE_OVER;
}
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
......@@ -2844,7 +2849,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
printf("ERROR: failed to read json, column len not found\n");
debugPrint("%s() LN%d: failed to read json, column len not found\n", __func__, __LINE__);
goto PARSE_OVER;
} else {
columnCase.dataLen = 8;
......@@ -2863,13 +2868,13 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
// tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) {
printf("ERROR: failed to read json, tags not found\n");
debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__);
goto PARSE_OVER;
}
int tagSize = cJSON_GetArraySize(tags);
if (tagSize > MAX_TAG_COUNT) {
printf("ERROR: failed to read json, tags size overflow, max tag size is %d\n", MAX_TAG_COUNT);
debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT);
goto PARSE_OVER;
}
......@@ -2997,7 +3002,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!gInsertInterval) {
g_args.insert_interval = 0;
} else {
fprintf(stderr, "ERROR: failed to read json, insert_interval input mistake\n");
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3007,7 +3012,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!rowsPerTbl) {
g_args.rows_per_tbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
fprintf(stderr, "ERROR: failed to read json, rows_per_tbl input mistake\n");
errorPrint("%s() LN%d, failed to read json, rows_per_tbl input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3017,7 +3022,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxSqlLen) {
g_args.max_sql_len = TSDB_PAYLOAD_SIZE;
} else {
fprintf(stderr, "ERROR: failed to read json, max_sql_len input mistake\n");
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3028,7 +3033,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!numRecPerReq) {
g_args.num_of_RPR = 100;
} else {
printf("ERROR: failed to read json, num_of_records_per_req not found\n");
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3058,7 +3063,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
int dbSize = cJSON_GetArraySize(dbs);
if (dbSize > MAX_DB_COUNT) {
fprintf(stderr,
errorPrint(
"ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_DB_COUNT);
goto PARSE_OVER;
......@@ -3257,7 +3262,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
int stbSize = cJSON_GetArraySize(stables);
if (stbSize > MAX_SUPER_TABLE_COUNT) {
fprintf(stderr,
errorPrint(
"ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER;
......@@ -3384,9 +3389,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp");
if (ts && ts->type == cJSON_String && ts->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, ts->valuestring, MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
ts->valuestring, MAX_DB_NAME_SIZE);
} else if (!ts) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, "now", MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
"now", MAX_DB_NAME_SIZE);
} else {
printf("ERROR: failed to read json, start_timestamp not found\n");
goto PARSE_OVER;
......@@ -3493,7 +3500,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!rowsPerTbl) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
fprintf(stderr, "ERROR: failed to read json, rowsPerTbl input mistake\n");
errorPrint("%s() LN%d, failed to read json, rowsPerTbl input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3523,7 +3530,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
fprintf(stderr, "failed to read json, insert_rows input mistake");
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3535,7 +3542,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
__func__, __LINE__, g_args.insert_interval);
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
} else {
fprintf(stderr, "failed to read json, insert_interval input mistake");
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3942,7 +3949,7 @@ static bool getInfoFromJsonFile(char* file) {
} else if (SUBSCRIBE_TEST == g_args.test_mode) {
ret = getMetaFromQueryJsonFile(root);
} else {
printf("ERROR: input json file type error! please input correct file type: insert or query or subscribe\n");
errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n", __func__, __LINE__);
goto PARSE_OVER;
}
......@@ -4024,14 +4031,14 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
printf("binary or nchar length overflow, max size:%u\n",
errorPrint( "binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return (-1);
}
char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1);
if (NULL == buf) {
printf("calloc failed! size:%d\n", stbInfo->columns[i].dataLen);
errorPrint( "calloc failed! size:%d\n", stbInfo->columns[i].dataLen);
return (-1);
}
rand_string(buf, stbInfo->columns[i].dataLen);
......@@ -4063,7 +4070,7 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "timestamp", 9)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint());
} else {
printf("No support data type: %s\n", stbInfo->columns[i].dataType);
errorPrint( "No support data type: %s\n", stbInfo->columns[i].dataType);
return (-1);
}
}
......@@ -4138,7 +4145,8 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
errorPrint("%s() LN%d, Failed to calloc %d Bytes, reason:%s\n",
__func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return -1;
......@@ -4148,7 +4156,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
fprintf(stderr, "read sample from csv file failed.\n");
errorPrint("%s() LN%d, read sample from csv file failed.\n", __func__, __LINE__);
tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
......@@ -4157,29 +4165,26 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return 0;
}
static int execInsert(threadInfo *winfo, char *buffer, int k)
static int execInsert(threadInfo *pThreadInfo, char *buffer, int k)
{
int affectedRows;
SSuperTable* superTblInfo = winfo->superTblInfo;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, buffer);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE);
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
if (0 != retCode) {
if (0 != postProceSql(g_Dbs.host, g_Dbs.port, buffer)) {
affectedRows = -1;
printf("========restful return fail, threadID[%d]\n", winfo->threadID);
printf("========restful return fail, threadID[%d]\n", pThreadInfo->threadID);
} else {
affectedRows = k;
}
}
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, 1);
affectedRows = queryDbExec(pThreadInfo->taos, buffer, 1);
}
return affectedRows;
......@@ -4195,8 +4200,9 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else {
verbosePrint("%s() LN%d: from=%d count=%d seq=%d\n",
__func__, __LINE__, pThreadInfo->start_table_from,
verbosePrint("[%d] %s() LN%d: from=%d count=%d seq=%d\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
......@@ -4323,7 +4329,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, threadInfo* pThrea
tableSeq % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
fprintf(stderr, "tag buf failed to allocate memory\n");
errorPrint("%s() LN%d, tag buf failed to allocate memory\n", __func__, __LINE__);
return -1;
}
......@@ -4396,13 +4402,14 @@ static int generateDataBuffer(char *pTblName,
}
static void* syncWriteInterlace(threadInfo *pThreadInfo) {
printf("### CBD: interlace write\n");
debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) {
fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n",
errorPrint( "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len,
strerror(errno));
return NULL;
......@@ -4437,8 +4444,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int tableSeq = pThreadInfo->start_table_from;
debugPrint("%s() LN%d: start_table_from=%d ntables=%d insertRows=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->start_table_from,
debugPrint("[%d] %s() LN%d: start_table_from=%d ntables=%d insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
int64_t startTime = pThreadInfo->start_time;
......@@ -4446,31 +4453,40 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int batchPerTblTimes;
int batchPerTbl;
if ((rowsPerTbl > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes = g_args.num_of_RPR / rowsPerTbl;
assert(pThreadInfo->ntables > 0);
if (rowsPerTbl > g_args.num_of_RPR)
rowsPerTbl = g_args.num_of_RPR;
batchPerTbl = rowsPerTbl;
if ((rowsPerTbl > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
(g_args.num_of_RPR / (rowsPerTbl * pThreadInfo->ntables)) + 1;
} else {
batchPerTblTimes = 1;
batchPerTbl = g_args.num_of_RPR;
}
int generatedRecPerTbl = 0;
bool flagSleep = true;
int sleepTimeTotal = 0;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if (insert_interval) {
if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampUs();
flagSleep = false;
}
// generate data
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer;
int recGenerated = 0;
int recOfBatch = 0;
for (int i = 0; i < batchPerTblTimes; i ++) {
getTableName(tableName, pThreadInfo, tableSeq);
int headLen;
if (i == 0) {
headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, pstr);
headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr);
} else {
headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values",
pThreadInfo->db_name,
......@@ -4478,48 +4494,70 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
// generate data buffer
verbosePrint("%s() LN%d i=%d buffer:\n%s\n",
__func__, __LINE__, i, buffer);
verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen;
int dataLen = 0;
printf("%s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
__func__, __LINE__, i, batchPerTblTimes, batchPerTbl);
int numOfRecGenerated = generateDataTail(
debugPrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, insertRows, 0,
startTime + pThreadInfo->totalInsertRows * superTblInfo->timeStampStep,
startTime + sleepTimeTotal +
pThreadInfo->totalInsertRows * superTblInfo->timeStampStep,
&(pThreadInfo->samplePos), &dataLen);
verbosePrint("%s() LN%d numOfRecGenerated= %d\n",
__func__, __LINE__, numOfRecGenerated);
pstr += dataLen;
recGenerated += numOfRecGenerated;
recOfBatch += batchPerTbl;
pThreadInfo->totalInsertRows += batchPerTbl;
debugPrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
tableSeq ++;
if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
generatedRecPerTbl += numOfRecGenerated;
generatedRecPerTbl += batchPerTbl;
flagSleep = true;
if (generatedRecPerTbl >= insertRows)
break;
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
break;
}
}
int remainRows = insertRows - generatedRecPerTbl;
if (batchPerTbl > remainRows)
if ((remainRows > 0) && (batchPerTbl > remainRows))
batchPerTbl = remainRows;
if ((g_args.num_of_RPR - recGenerated) < batchPerTbl)
debugPrint("[%d] %s() LN%d generatedRecPerTbl=%d insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__,
generatedRecPerTbl, insertRows);
if ((g_args.num_of_RPR - recOfBatch) < batchPerTbl)
break;
}
pThreadInfo->totalInsertRows += recGenerated;
printf("%s() LN%d recGenerated=%d totalInsertRows=%"PRId64" buffer:\n%s\n",
__func__, __LINE__, recGenerated,
pThreadInfo->totalInsertRows, buffer);
int affectedRows = execInsert(pThreadInfo, buffer, recGenerated);
if (affectedRows < 0)
debugPrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
pThreadInfo->threadID, __func__, __LINE__, buffer);
int affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (affectedRows < 0) {
errorPrint("[%d] %s() LN%d execInsert affected rows: %d\n%s\n",
pThreadInfo->threadID, __func__, __LINE__,
affectedRows, buffer);
goto free_and_statistics_interlace;
}
pThreadInfo->totalAffectedRows += affectedRows;
......@@ -4539,14 +4577,15 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
lastPrintTime = currentPrintTime;
}
if (insert_interval) {
if ((insert_interval) && flagSleep) {
et = taosGetTimestampUs();
if (insert_interval > ((et - st)/1000) ) {
int sleep_time = insert_interval - (et -st)/1000;
verbosePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms
int sleepTime = insert_interval - (et -st)/1000;
// verbosePrint("%s() LN%d sleep: %d ms for insert interval\n",
// __func__, __LINE__, sleepTime);
taosMsleep(sleepTime); // ms
sleepTimeTotal += insert_interval;
}
}
}
......@@ -4570,12 +4609,13 @@ free_and_statistics_interlace:
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) {
fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n",
errorPrint( "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len,
strerror(errno));
return NULL;
......@@ -4821,7 +4861,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO;
} else {
fprintf(stderr, "No support precision: %s\n", precision);
errorPrint( "No support precision: %s\n", precision);
exit(-1);
}
}
......@@ -4836,7 +4876,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
&start_time,
strlen(superTblInfo->startTimestamp),
timePrec, 0)) {
fprintf(stderr, "ERROR to parse time!\n");
errorPrint("%s() LN%d, failed to parse time!\n", __func__, __LINE__);
exit(-1);
}
}
......@@ -4857,7 +4897,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
fprintf(stderr, "prepare sample data for stable failed!\n");
errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__);
exit(-1);
}
}
......@@ -4869,15 +4909,15 @@ static void startMultiThreadInsertData(int threads, char* db_name,
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
fprintf(stderr, "connect to server fail , reason: %s\n",
taos_errstr(NULL));
errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL));
exit(-1);
}
superTblInfo->childTblName = (char*)calloc(1,
superTblInfo->childTblLimit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
fprintf(stderr, "alloc memory failed!");
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos);
exit(-1);
}
......@@ -4896,7 +4936,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
fprintf(stderr, "prepare sample data for stable failed!\n");
errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__);
exit(-1);
}
}
......@@ -4906,8 +4946,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
fprintf(stderr, "connect to server fail , reason: %s\n",
taos_errstr(NULL));
errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL));
exit(-1);
}
......@@ -4926,7 +4966,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
superTblInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
fprintf(stderr, "alloc memory failed!");
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos);
exit(-1);
}
......@@ -4957,7 +4997,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
fprintf(stderr, "connect to server fail from insert sub thread, reason: %s\n",
errorPrint( "connect to server fail from insert sub thread, reason: %s\n",
taos_errstr(NULL));
exit(-1);
}
......@@ -5001,6 +5041,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRId64" totalAffected=%"PRId64"\n",
__func__, __LINE__,
t_info->threadID, t_info->totalInsertRows,
t_info->totalAffectedRows);
if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalInsertRows += t_info->totalInsertRows;
......@@ -5068,7 +5112,7 @@ void *readTable(void *sarg) {
char *tb_prefix = rinfo->tb_prefix;
FILE *fp = fopen(rinfo->fp, "a");
if (NULL == fp) {
fprintf(stderr, "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
errorPrint( "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
return NULL;
}
......@@ -5102,7 +5146,7 @@ void *readTable(void *sarg) {
int32_t code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql));
errorPrint( "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
fclose(fp);
......@@ -5178,7 +5222,7 @@ void *readMetric(void *sarg) {
int32_t code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql));
errorPrint( "Failed to query:%s\n", taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
fclose(fp);
......@@ -5216,7 +5260,7 @@ static int insertTestProcess() {
debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile);
errorPrint( "Failed to open %s for save result\n", g_Dbs.resultFile);
return -1;
}
......@@ -5407,7 +5451,7 @@ static int queryTestProcess() {
NULL,
g_queryInfo.port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
exit(-1);
}
......@@ -5707,7 +5751,7 @@ static int subscribeTestProcess() {
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
exit(-1);
}
......@@ -6065,7 +6109,7 @@ static void queryResult() {
g_Dbs.db[0].dbName,
g_Dbs.port);
if (rInfo->taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
free(rInfo);
exit(-1);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册