“6d093c34f12b88e40cf33dd055f9b7c63e15a26e”上不存在“docs-examples/c/error_handle_example.c”
提交 34972fbf 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/feature/sangshuduo/TD-4068-taosdemo-stmt' into feature/m2d

...@@ -98,7 +98,7 @@ TEST(testCase, parse_time) { ...@@ -98,7 +98,7 @@ TEST(testCase, parse_time) {
taosParseTime(t41, &time, strlen(t41), TSDB_TIME_PRECISION_MILLI, 0); taosParseTime(t41, &time, strlen(t41), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 852048000999); EXPECT_EQ(time, 852048000999);
int64_t k = timezone; // int64_t k = timezone;
char t42[] = "1997-1-1T0:0:0.999999999Z"; char t42[] = "1997-1-1T0:0:0.999999999Z";
taosParseTime(t42, &time, strlen(t42), TSDB_TIME_PRECISION_MILLI, 0); taosParseTime(t42, &time, strlen(t42), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 852048000999 - timezone * MILLISECOND_PER_SECOND); EXPECT_EQ(time, 852048000999 - timezone * MILLISECOND_PER_SECOND);
...@@ -163,7 +163,7 @@ TEST(testCase, parse_time) { ...@@ -163,7 +163,7 @@ TEST(testCase, parse_time) {
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0); taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, -28800 * MILLISECOND_PER_SECOND); EXPECT_EQ(time, -28800 * MILLISECOND_PER_SECOND);
char* t = "2021-01-08T02:11:40.000+00:00"; char t[] = "2021-01-08T02:11:40.000+00:00";
taosParseTime(t, &time, strlen(t), TSDB_TIME_PRECISION_MILLI, 0); taosParseTime(t, &time, strlen(t), TSDB_TIME_PRECISION_MILLI, 0);
printf("%ld\n", time); printf("%ld\n", time);
} }
......
...@@ -120,11 +120,18 @@ enum MODE { ...@@ -120,11 +120,18 @@ enum MODE {
MODE_BUT MODE_BUT
}; };
typedef enum enum_INSERT_MODE { enum enum_TAOS_INTERFACE {
TAOSC_IFACE,
REST_IFACE,
STMT_IFACE,
INTERFACE_BUT
};
typedef enum enum_PROGRESSIVE_OR_INTERLACE {
PROGRESSIVE_INSERT_MODE, PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE, INTERLACE_INSERT_MODE,
INVALID_INSERT_MODE INVALID_INSERT_MODE
} INSERT_MODE; } PROG_OR_INTERLACE_MODE;
typedef enum enumQUERY_TYPE { typedef enum enumQUERY_TYPE {
NO_INSERT_TYPE, NO_INSERT_TYPE,
...@@ -188,6 +195,7 @@ typedef struct SArguments_S { ...@@ -188,6 +195,7 @@ typedef struct SArguments_S {
uint32_t test_mode; uint32_t test_mode;
char * host; char * host;
uint16_t port; uint16_t port;
uint16_t iface;
char * user; char * user;
char * password; char * password;
char * database; char * database;
...@@ -238,7 +246,7 @@ typedef struct SSuperTable_S { ...@@ -238,7 +246,7 @@ typedef struct SSuperTable_S {
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE]; char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest uint16_t insertMode; // 0: taosc, 1: rest, 2: stmt
int64_t childTblLimit; int64_t childTblLimit;
uint64_t childTblOffset; uint64_t childTblOffset;
...@@ -405,6 +413,7 @@ typedef struct SQueryMetaInfo_S { ...@@ -405,6 +413,7 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S { typedef struct SThreadInfo_S {
TAOS * taos; TAOS * taos;
TAOS_STMT *stmt;
int threadID; int threadID;
char db_name[MAX_DB_NAME_SIZE+1]; char db_name[MAX_DB_NAME_SIZE+1];
uint32_t time_precision; uint32_t time_precision;
...@@ -418,6 +427,7 @@ typedef struct SThreadInfo_S { ...@@ -418,6 +427,7 @@ typedef struct SThreadInfo_S {
char* cols; char* cols;
bool use_metric; bool use_metric;
SSuperTable* superTblInfo; SSuperTable* superTblInfo;
char *buffer; // sql cmd buffer
// for async insert // for async insert
tsem_t lock_sem; tsem_t lock_sem;
...@@ -536,6 +546,7 @@ SArguments g_args = { ...@@ -536,6 +546,7 @@ SArguments g_args = {
0, // test_mode 0, // test_mode
"127.0.0.1", // host "127.0.0.1", // host
6030, // port 6030, // port
TAOSC_IFACE, // iface
"root", // user "root", // user
#ifdef _TD_POWER_ #ifdef _TD_POWER_
"powerdb", // password "powerdb", // password
...@@ -652,6 +663,8 @@ static void printHelp() { ...@@ -652,6 +663,8 @@ static void printHelp() {
"The host to connect to TDengine. Default is localhost."); "The host to connect to TDengine. Default is localhost.");
printf("%s%s%s%s\n", indent, "-p", indent, printf("%s%s%s%s\n", indent, "-p", indent,
"The TCP/IP port number to use for the connection. Default is 0."); "The TCP/IP port number to use for the connection. Default is 0.");
printf("%s%s%s%s\n", indent, "-I", indent,
"The interface (taosc, rest, and stmt) taosdemo uses. Default is 'taosc'.");
printf("%s%s%s%s\n", indent, "-d", indent, printf("%s%s%s%s\n", indent, "-d", indent,
"Destination database. Default is 'test'."); "Destination database. Default is 'test'.");
printf("%s%s%s%s\n", indent, "-a", indent, printf("%s%s%s%s\n", indent, "-a", indent,
...@@ -740,6 +753,23 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -740,6 +753,23 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
arguments->port = atoi(argv[++i]); arguments->port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-I") == 0) {
if (argc == i+1) {
printHelp();
errorPrint("%s", "\n\t-I need a valid string following!\n");
exit(EXIT_FAILURE);
}
++i;
if (0 == strcasecmp(argv[i], "taosc")) {
arguments->iface = TAOSC_IFACE;
} else if (0 == strcasecmp(argv[i], "rest")) {
arguments->iface = REST_IFACE;
} else if (0 == strcasecmp(argv[i], "stmt")) {
arguments->iface = STMT_IFACE;
} else {
errorPrint("%s", "\n\t-I need a valid string following!\n");
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-u") == 0) { } else if (strcmp(argv[i], "-u") == 0) {
if (argc == i+1) { if (argc == i+1) {
printHelp(); printHelp();
...@@ -997,7 +1027,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -997,7 +1027,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->port ); arguments->port );
printf("# User: %s\n", arguments->user); printf("# User: %s\n", arguments->user);
printf("# Password: %s\n", arguments->password); printf("# Password: %s\n", arguments->password);
printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); printf("# Use metric: %s\n",
arguments->use_metric ? "true" : "false");
if (*(arguments->datatype)) { if (*(arguments->datatype)) {
printf("# Specified data type: "); printf("# Specified data type: ");
for (int i = 0; i < MAX_NUM_DATATYPE; i++) for (int i = 0; i < MAX_NUM_DATATYPE; i++)
...@@ -1059,7 +1090,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { ...@@ -1059,7 +1090,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
TAOS_RES *res = NULL; TAOS_RES *res = NULL;
int32_t code = -1; int32_t code = -1;
for (i = 0; i < 5; i++) { for (i = 0; i < 5 /* retry */; i++) {
if (NULL != res) { if (NULL != res) {
taos_free_result(res); taos_free_result(res);
res = NULL; res = NULL;
...@@ -1105,7 +1136,6 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile) ...@@ -1105,7 +1136,6 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile)
} }
} }
fprintf(fp, "%s", resultBuf); fprintf(fp, "%s", resultBuf);
tmfclose(fp); tmfclose(fp);
} }
...@@ -1146,8 +1176,8 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { ...@@ -1146,8 +1176,8 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
free(databuf); free(databuf);
} }
static void selectAndGetResult(threadInfo *pThreadInfo, char *command, char* resultFile) static void selectAndGetResult(
{ threadInfo *pThreadInfo, char *command, char* resultFile) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
TAOS_RES *res = taos_query(pThreadInfo->taos, command); TAOS_RES *res = taos_query(pThreadInfo->taos, command);
if (res == NULL || taos_errno(res) != 0) { if (res == NULL || taos_errno(res) != 0) {
...@@ -1291,6 +1321,8 @@ static void init_rand_data() { ...@@ -1291,6 +1321,8 @@ static void init_rand_data() {
static int printfInsertMeta() { static int printfInsertMeta() {
SHOW_PARSE_RESULT_START(); SHOW_PARSE_RESULT_START();
printf("interface: \033[33m%s\033[0m\n",
(g_args.iface==TAOSC_IFACE)?"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt");
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
printf("password: \033[33m%s\033[0m\n", g_Dbs.password); printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
...@@ -1395,7 +1427,8 @@ static int printfInsertMeta() { ...@@ -1395,7 +1427,8 @@ static int printfInsertMeta() {
printf(" dataSource: \033[33m%s\033[0m\n", printf(" dataSource: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].dataSource); g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n", printf(" insertMode: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].insertMode); (g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt");
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) { if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n", printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n",
g_Dbs.db[i].superTbls[j].childTblLimit); g_Dbs.db[i].superTbls[j].childTblLimit);
...@@ -1550,8 +1583,8 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1550,8 +1583,8 @@ static void printfInsertMetaToFile(FILE* fp) {
} }
fprintf(fp, " super table count: %"PRIu64"\n", g_Dbs.db[i].superTblCount); fprintf(fp, " super table count: %"PRIu64"\n", g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
fprintf(fp, " super table[%d]:\n", j); fprintf(fp, " super table[%"PRIu64"]:\n", j);
fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName); fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName);
...@@ -1578,7 +1611,8 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1578,7 +1611,8 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " dataSource: %s\n", fprintf(fp, " dataSource: %s\n",
g_Dbs.db[i].superTbls[j].dataSource); g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", fprintf(fp, " insertMode: %s\n",
g_Dbs.db[i].superTbls[j].insertMode); (g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt");
fprintf(fp, " insertRows: %"PRId64"\n", fprintf(fp, " insertRows: %"PRId64"\n",
g_Dbs.db[i].superTbls[j].insertRows); g_Dbs.db[i].superTbls[j].insertRows);
fprintf(fp, " interlace rows: %"PRIu64"\n", fprintf(fp, " interlace rows: %"PRIu64"\n",
...@@ -2749,7 +2783,7 @@ static int createDatabasesAndStables() { ...@@ -2749,7 +2783,7 @@ static int createDatabasesAndStables() {
int validStbCount = 0; int validStbCount = 0;
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName); g_Dbs.db[i].superTbls[j].sTblName);
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
...@@ -2761,7 +2795,7 @@ static int createDatabasesAndStables() { ...@@ -2761,7 +2795,7 @@ static int createDatabasesAndStables() {
&g_Dbs.db[i].superTbls[j]); &g_Dbs.db[i].superTbls[j]);
if (0 != ret) { if (0 != ret) {
errorPrint("create super table %d failed!\n\n", j); errorPrint("create super table %"PRIu64" failed!\n\n", j);
continue; continue;
} }
} }
...@@ -2789,7 +2823,7 @@ static void* createTable(void *sarg) ...@@ -2789,7 +2823,7 @@ static void* createTable(void *sarg)
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
int buff_len; int buff_len;
buff_len = BUFFER_SIZE / 8; buff_len = BUFFER_SIZE / 8;
...@@ -2864,7 +2898,7 @@ static void* createTable(void *sarg) ...@@ -2864,7 +2898,7 @@ static void* createTable(void *sarg)
return NULL; return NULL;
} }
int64_t currentPrintTime = taosGetTimestampMs(); uint64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n", printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
pThreadInfo->threadID, pThreadInfo->start_table_from, i); pThreadInfo->threadID, pThreadInfo->start_table_from, i);
...@@ -2888,7 +2922,7 @@ static int startMultiThreadCreateChildTable( ...@@ -2888,7 +2922,7 @@ static int startMultiThreadCreateChildTable(
char* db_name, SSuperTable* superTblInfo) { char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t)); pthread_t *pids = malloc(threads * sizeof(pthread_t));
threadInfo *infos = malloc(threads * sizeof(threadInfo)); threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed\n"); printf("malloc failed\n");
...@@ -2908,7 +2942,7 @@ static int startMultiThreadCreateChildTable( ...@@ -2908,7 +2942,7 @@ static int startMultiThreadCreateChildTable(
int64_t b = 0; int64_t b = 0;
b = ntables % threads; b = ntables % threads;
for (int64_t i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
...@@ -2961,7 +2995,7 @@ static void createChildTables() { ...@@ -2961,7 +2995,7 @@ static void createChildTables() {
if (g_Dbs.use_metric) { if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
// with super table // with super table
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
continue; continue;
...@@ -3082,10 +3116,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { ...@@ -3082,10 +3116,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
return 0; return 0;
} }
#if 0
int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
// TODO // TODO
return 0; return 0;
} }
#endif
/* /*
Read 10000 lines at most. If more than 10000 lines, continue to read after using Read 10000 lines at most. If more than 10000 lines, continue to read after using
...@@ -3785,15 +3821,24 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3785,15 +3821,24 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
if (insertMode && insertMode->type == cJSON_String if (insertMode && insertMode->type == cJSON_String
&& insertMode->valuestring != NULL) { && insertMode->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, if (0 == strcasecmp(insertMode->valuestring, "taosc")) {
insertMode->valuestring, MAX_DB_NAME_SIZE); g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE;
} else if (0 == strcasecmp(insertMode->valuestring, "rest")) {
g_Dbs.db[i].superTbls[j].insertMode = REST_IFACE;
} else if (0 == strcasecmp(insertMode->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].insertMode = STMT_IFACE;
} else {
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
__func__, __LINE__, insertMode->valuestring);
goto PARSE_OVER;
}
} else if (!insertMode) { } else if (!insertMode) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE); g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE;
} else { } else {
printf("ERROR: failed to read json, insert_mode not found\n"); errorPrint("%s", "failed to read json, insert_mode not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -4510,7 +4555,7 @@ static void prepareSampleData() { ...@@ -4510,7 +4555,7 @@ static void prepareSampleData() {
static void postFreeResource() { static void postFreeResource() {
tmfclose(g_fpOfInsertResult); tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
...@@ -4715,32 +4760,43 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { ...@@ -4715,32 +4760,43 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return 0; return 0;
} }
static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k)
{ {
int affectedRows; int affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, buffer); __func__, __LINE__, pThreadInfo->buffer);
if (superTblInfo) { if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (superTblInfo->insertMode == TAOSC_IFACE) {
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); affectedRows = queryDbExec(
} else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) { pThreadInfo->taos,
pThreadInfo->buffer, INSERT_TYPE, false);
} else if (superTblInfo->insertMode == REST_IFACE) {
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
buffer, NULL /* not set result file */)) { pThreadInfo->buffer, NULL /* not set result file */)) {
affectedRows = -1; affectedRows = -1;
printf("========restful return fail, threadID[%d]\n", printf("========restful return fail, threadID[%d]\n",
pThreadInfo->threadID); pThreadInfo->threadID);
} else { } else {
affectedRows = k; affectedRows = k;
} }
} else if (superTblInfo->insertMode == STMT_IFACE) {
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
errorPrint("%s() LN%d, failied to execute insert statement\n",
__func__, __LINE__);
exit(-1);
}
affectedRows = k;
} else { } else {
errorPrint("%s() LN%d: unknown insert mode: %s\n", errorPrint("%s() LN%d: unknown insert mode: %d\n",
__func__, __LINE__, superTblInfo->insertMode); __func__, __LINE__, superTblInfo->insertMode);
affectedRows = 0; affectedRows = 0;
} }
} else { } else {
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); affectedRows = queryDbExec(pThreadInfo->taos, pThreadInfo->buffer, INSERT_TYPE, false);
} }
return affectedRows; return affectedRows;
...@@ -5086,20 +5142,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5086,20 +5142,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (interlaceRows > g_args.num_of_RPR) if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR; interlaceRows = g_args.num_of_RPR;
int insertMode; int progOrInterlace;
if (interlaceRows > 0) { if (interlaceRows > 0) {
insertMode = INTERLACE_INSERT_MODE; progOrInterlace= INTERLACE_INSERT_MODE;
} else { } else {
insertMode = PROGRESSIVE_INSERT_MODE; progOrInterlace = PROGRESSIVE_INSERT_MODE;
} }
// TODO: prompt tbl count multple interlace rows and batch
//
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(maxSqlLen, 1); pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) { if (NULL == pThreadInfo->buffer) {
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__, maxSqlLen, strerror(errno)); __func__, __LINE__, maxSqlLen, strerror(errno));
return NULL; return NULL;
...@@ -5152,10 +5205,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5152,10 +5205,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
flagSleep = false; flagSleep = false;
} }
// generate data // generate data
memset(buffer, 0, maxSqlLen); memset(pThreadInfo->buffer, 0, maxSqlLen);
uint64_t remainderBufLen = maxSqlLen; uint64_t remainderBufLen = maxSqlLen;
char *pstr = buffer; char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto); int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto);
pstr += len; pstr += len;
...@@ -5168,7 +5221,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5168,7 +5221,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
free(buffer); free(pThreadInfo->buffer);
return NULL; return NULL;
} }
...@@ -5200,7 +5253,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5200,7 +5253,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch); batchPerTbl, recOfBatch);
if (insertMode == INTERLACE_INSERT_MODE) { if (progOrInterlace == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table // turn to first table
tableSeq = pThreadInfo->start_table_from; tableSeq = pThreadInfo->start_table_from;
...@@ -5234,7 +5287,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5234,7 +5287,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__, recOfBatch, pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows); pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n", verbosePrint("[%d] %s() LN%d, buffer=%s\n",
pThreadInfo->threadID, __func__, __LINE__, buffer); pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer);
startTs = taosGetTimestampMs(); startTs = taosGetTimestampMs();
...@@ -5245,7 +5298,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5245,7 +5298,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n"); errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
goto free_of_interlace; goto free_of_interlace;
} }
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch); int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs; uint64_t delay = endTs - startTs;
...@@ -5263,7 +5316,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5263,7 +5316,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (recOfBatch != affectedRows) { if (recOfBatch != affectedRows) {
errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n", errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, buffer); recOfBatch, affectedRows, pThreadInfo->buffer);
goto free_of_interlace; goto free_of_interlace;
} }
...@@ -5282,8 +5335,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5282,8 +5335,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
et = taosGetTimestampMs(); et = taosGetTimestampMs();
if (insert_interval > (et - st) ) { if (insert_interval > (et - st) ) {
int sleepTime = insert_interval - (et -st); uint64_t sleepTime = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n", performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
__func__, __LINE__, sleepTime); __func__, __LINE__, sleepTime);
taosMsleep(sleepTime); // ms taosMsleep(sleepTime); // ms
sleepTimeTotal += insert_interval; sleepTimeTotal += insert_interval;
...@@ -5292,7 +5345,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5292,7 +5345,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} }
free_of_interlace: free_of_interlace:
tmfree(buffer); tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo); printStatPerThread(pThreadInfo);
return NULL; return NULL;
} }
...@@ -5311,8 +5364,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5311,8 +5364,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(maxSqlLen, 1); pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) { if (NULL == pThreadInfo->buffer) {
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n", errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen, maxSqlLen,
strerror(errno)); strerror(errno));
...@@ -5358,7 +5411,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5358,7 +5411,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->threadID, tableSeq, tableName); pThreadInfo->threadID, tableSeq, tableName);
int64_t remainderBufLen = maxSqlLen; int64_t remainderBufLen = maxSqlLen;
char *pstr = buffer; char *pstr = pThreadInfo->buffer;
int nInsertBufLen = strlen("insert into "); int nInsertBufLen = strlen("insert into ");
int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into "); int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into ");
...@@ -5381,7 +5434,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5381,7 +5434,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
startTs = taosGetTimestampMs(); startTs = taosGetTimestampMs();
int64_t affectedRows = execInsert(pThreadInfo, buffer, generated); int64_t affectedRows = execInsert(pThreadInfo, generated);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs; uint64_t delay = endTs - startTs;
...@@ -5440,7 +5493,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5440,7 +5493,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
} // tableSeq } // tableSeq
free_of_progressive: free_of_progressive:
tmfree(buffer); tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo); printStatPerThread(pThreadInfo);
return NULL; return NULL;
} }
...@@ -5579,15 +5632,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * ...@@ -5579,15 +5632,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
static void startMultiThreadInsertData(int threads, char* db_name, static void startMultiThreadInsertData(int threads, char* db_name,
char* precision,SSuperTable* superTblInfo) { char* precision,SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = malloc(threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
//TAOS* taos; //TAOS* taos;
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
...@@ -5648,10 +5692,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5648,10 +5692,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} }
} }
TAOS* taos = taos_connect( TAOS* taos0 = taos_connect(
g_Dbs.host, g_Dbs.user, g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port); g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) { if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n", errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL)); __func__, __LINE__, taos_errstr(NULL));
exit(-1); exit(-1);
...@@ -5710,13 +5754,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5710,13 +5754,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
limit * TSDB_TABLE_NAME_LEN); limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) { if (superTblInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos); taos_close(taos0);
exit(-1); exit(-1);
} }
int64_t childTblCount; int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset( getChildNameOfSuperTableWithLimitAndOffset(
taos, taos0,
db_name, superTblInfo->sTblName, db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount, &superTblInfo->childTblName, &childTblCount,
limit, limit,
...@@ -5726,7 +5770,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5726,7 +5770,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
startFrom = 0; startFrom = 0;
} }
taos_close(taos); taos_close(taos0);
int64_t a = ntables / threads; int64_t a = ntables / threads;
if (a < 1) { if (a < 1) {
...@@ -5740,11 +5784,21 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5740,11 +5784,21 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} }
if ((superTblInfo) if ((superTblInfo)
&& (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) { && (superTblInfo->insertMode == REST_IFACE)) {
if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1); exit(-1);
}
} }
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
...@@ -5756,17 +5810,32 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5756,17 +5810,32 @@ static void startMultiThreadInsertData(int threads, char* db_name,
t_info->minDelay = UINT64_MAX; t_info->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) || if ((NULL == superTblInfo) ||
(0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { (superTblInfo->insertMode != REST_IFACE)) {
//t_info->taos = taos; //t_info->taos = taos;
t_info->taos = taos_connect( t_info->taos = taos_connect(
g_Dbs.host, g_Dbs.user, g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port); g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) { if (NULL == t_info->taos) {
errorPrint( errorPrint(
"connect to server fail from insert sub thread, reason: %s\n", "%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL)); taos_errstr(NULL));
free(infos);
exit(-1); exit(-1);
} }
if ((superTblInfo) && (superTblInfo->insertMode == STMT_IFACE)) {
t_info->stmt = taos_stmt_init(t_info->taos);
if (NULL == t_info->stmt) {
errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
}
} else { } else {
t_info->taos = NULL; t_info->taos = NULL;
} }
...@@ -5806,6 +5875,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5806,6 +5875,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
tsem_destroy(&(t_info->lock_sem)); tsem_destroy(&(t_info->lock_sem));
if (t_info->stmt) {
taos_stmt_close(t_info->stmt);
}
taos_close(t_info->taos); taos_close(t_info->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n", debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
...@@ -6083,7 +6156,7 @@ static int insertTestProcess() { ...@@ -6083,7 +6156,7 @@ static int insertTestProcess() {
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) { if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
...@@ -6878,7 +6951,7 @@ static void setParaFromArg(){ ...@@ -6878,7 +6951,7 @@ static void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, MAX_TB_NAME_SIZE); g_args.tb_prefix, MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].insertMode = g_args.iface;
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册