From 8517694940f73be2aa479076f7b543d0d20c2d68 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 13 May 2021 20:37:34 +0800 Subject: [PATCH] [TD-4068]: taosdemo support stmt interface. construct framework. --- src/kit/taosdemo/taosdemo.c | 147 +++++++++++++++++++++++------------- 1 file changed, 93 insertions(+), 54 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index e5d8556467..108acfc78a 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -121,9 +121,9 @@ enum MODE { }; enum INTERFACE { - TAOSC_INTERFACE, - REST_INTERFACE, - STMT_INTERFACE, + TAOSC_IFACE, + REST_IFACE, + STMT_IFACE, INTERFACE_BUT }; @@ -131,7 +131,7 @@ typedef enum enum_INSERT_MODE { PROGRESSIVE_INSERT_MODE, INTERLACE_INSERT_MODE, INVALID_INSERT_MODE -} INSERT_MODE; +} PROG_OR_INTERLACE_MODE; typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, @@ -246,9 +246,9 @@ typedef struct SSuperTable_S { uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table char childTblPrefix[MAX_TB_NAME_SIZE]; char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample - char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest + uint16_t insertMode; // 0: taosc, 1: rest, 2: stmt int64_t childTblLimit; - uint64_t childTblOffset; + uint64_t childTblOffset; // int multiThreadWriteOneTbl; // 0: no, 1: yes uint64_t interlaceRows; // @@ -266,7 +266,7 @@ typedef struct SSuperTable_S { uint32_t columnCount; StrColumn columns[MAX_COLUMN_COUNT]; - uint32_t tagCount; + uint32_t tagCount; StrColumn tags[MAX_TAG_COUNT]; char* childTblName; @@ -291,7 +291,7 @@ typedef struct SSuperTable_S { typedef struct { char name[TSDB_DB_NAME_LEN + 1]; char create_time[32]; - int32_t ntables; + uint64_t ntables; int32_t vgroups; int16_t replica; int16_t quorum; @@ -413,6 +413,7 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; + TAOS_STMT *stmt; int threadID; char db_name[MAX_DB_NAME_SIZE+1]; uint32_t time_precision; @@ -544,7 +545,7 @@ SArguments g_args = { 0, // test_mode "127.0.0.1", // host 6030, // port - TAOSC_INTERFACE, // interface + TAOSC_IFACE, // interface "root", // user #ifdef _TD_POWER_ "powerdb", // password @@ -759,11 +760,11 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } ++i; if (0 == strcasecmp(argv[i], "taosc")) { - arguments->interface = TAOSC_INTERFACE; + arguments->interface = TAOSC_IFACE; } else if (0 == strcasecmp(argv[i], "rest")) { - arguments->interface = REST_INTERFACE; + arguments->interface = REST_IFACE; } else if (0 == strcasecmp(argv[i], "stmt")) { - arguments->interface = STMT_INTERFACE; + arguments->interface = STMT_IFACE; } else { errorPrint("%s", "\n\t-I need a valid string following!\n"); exit(EXIT_FAILURE); @@ -1025,7 +1026,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->port ); printf("# User: %s\n", arguments->user); printf("# Password: %s\n", arguments->password); - printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); + printf("# Use metric: %s\n", + arguments->use_metric ? "true" : "false"); if (*(arguments->datatype)) { printf("# Specified data type: "); for (int i = 0; i < MAX_NUM_DATATYPE; i++) @@ -1319,6 +1321,8 @@ static void init_rand_data() { static int printfInsertMeta() { SHOW_PARSE_RESULT_START(); + printf("interface: \033[33m%s\033[0m\n", + (g_args.interface==TAOSC_IFACE)?"taosc":(g_args.interface==REST_IFACE)?"rest":"stmt"); printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("password: \033[33m%s\033[0m\n", g_Dbs.password); @@ -1423,7 +1427,8 @@ static int printfInsertMeta() { printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", - g_Dbs.db[i].superTbls[j].insertMode); + (g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt"); if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) { printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].childTblLimit); @@ -1606,7 +1611,8 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", - g_Dbs.db[i].superTbls[j].insertMode); + (g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt"); fprintf(fp, " insertRows: %"PRIu64"\n", g_Dbs.db[i].superTbls[j].insertRows); fprintf(fp, " interlace rows: %"PRIu64"\n", @@ -2916,7 +2922,7 @@ static int startMultiThreadCreateChildTable( char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); - threadInfo *infos = malloc(threads * sizeof(threadInfo)); + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { printf("malloc failed\n"); @@ -3110,10 +3116,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { return 0; } +#if 0 int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { // TODO return 0; } +#endif /* Read 10000 lines at most. If more than 10000 lines, continue to read after using @@ -3813,15 +3821,24 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest + cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt if (insertMode && insertMode->type == cJSON_String && insertMode->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, - insertMode->valuestring, MAX_DB_NAME_SIZE); + if (0 == strcasecmp(insertMode->valuestring, "taosc")) { + g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE; + } else if (0 == strcasecmp(insertMode->valuestring, "rest")) { + g_Dbs.db[i].superTbls[j].insertMode = REST_IFACE; + } else if (0 == strcasecmp(insertMode->valuestring, "stmt")) { + g_Dbs.db[i].superTbls[j].insertMode = STMT_IFACE; + } else { + errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n", + __func__, __LINE__, insertMode->valuestring); + goto PARSE_OVER; + } } else if (!insertMode) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE); + g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE; } else { - printf("ERROR: failed to read json, insert_mode not found\n"); + errorPrint("%s", "failed to read json, insert_mode not found\n"); goto PARSE_OVER; } @@ -4751,9 +4768,9 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, __func__, __LINE__, buffer); if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { + if (superTblInfo->insertMode == TAOSC_IFACE) { affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); - } else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) { + } else if (superTblInfo->insertMode == REST_IFACE) { if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, buffer, NULL /* not set result file */)) { affectedRows = -1; @@ -4762,8 +4779,13 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) } else { affectedRows = k; } + } else if (superTblInfo->insertMode == STMT_IFACE) { + // TODO: add stmt support + errorPrint("%s() LN%d, %s\n", + __func__, __LINE__, "!!! need support stmt here"); + exit(-1); } else { - errorPrint("%s() LN%d: unknown insert mode: %s\n", + errorPrint("%s() LN%d: unknown insert mode: %d\n", __func__, __LINE__, superTblInfo->insertMode); affectedRows = 0; } @@ -4800,7 +4822,7 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t table static int64_t generateDataTail( SSuperTable* superTblInfo, uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows, - int64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) { + uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) { uint64_t len = 0; uint32_t ncols_per_record = 1; // count first col ts @@ -5114,17 +5136,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { if (interlaceRows > g_args.num_of_RPR) interlaceRows = g_args.num_of_RPR; - int insertMode; + int progOrInterlace; if (interlaceRows > 0) { - insertMode = INTERLACE_INSERT_MODE; + progOrInterlace= INTERLACE_INSERT_MODE; } else { - insertMode = PROGRESSIVE_INSERT_MODE; + progOrInterlace = PROGRESSIVE_INSERT_MODE; } - // TODO: prompt tbl count multple interlace rows and batch - // - uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; char* buffer = calloc(maxSqlLen, 1); if (NULL == buffer) { @@ -5230,7 +5249,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->threadID, __func__, __LINE__, batchPerTbl, recOfBatch); - if (insertMode == INTERLACE_INSERT_MODE) { + if (progOrInterlace == INTERLACE_INSERT_MODE) { if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { // turn to first table tableSeq = pThreadInfo->start_table_from; @@ -5609,15 +5628,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * static void startMultiThreadInsertData(int threads, char* db_name, char* precision,SSuperTable* superTblInfo) { - pthread_t *pids = malloc(threads * sizeof(pthread_t)); - assert(pids != NULL); - - threadInfo *infos = malloc(threads * sizeof(threadInfo)); - assert(infos != NULL); - - memset(pids, 0, threads * sizeof(pthread_t)); - memset(infos, 0, threads * sizeof(threadInfo)); - //TAOS* taos; //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); @@ -5678,17 +5688,17 @@ static void startMultiThreadInsertData(int threads, char* db_name, } } - TAOS* taos = taos_connect( + TAOS* taos0 = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - if (NULL == taos) { + if (NULL == taos0) { errorPrint("%s() LN%d, connect to server fail , reason: %s\n", __func__, __LINE__, taos_errstr(NULL)); exit(-1); } - int ntables = 0; - int startFrom; + uint64_t ntables = 0; + uint startFrom; if (superTblInfo) { int64_t limit; @@ -5740,13 +5750,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, limit * TSDB_TABLE_NAME_LEN); if (superTblInfo->childTblName == NULL) { errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); - taos_close(taos); + taos_close(taos0); exit(-1); } uint64_t childTblCount; getChildNameOfSuperTableWithLimitAndOffset( - taos, + taos0, db_name, superTblInfo->sTblName, &superTblInfo->childTblName, &childTblCount, limit, @@ -5756,7 +5766,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, startFrom = 0; } - taos_close(taos); + taos_close(taos0); uint64_t a = ntables / threads; if (a < 1) { @@ -5770,11 +5780,21 @@ static void startMultiThreadInsertData(int threads, char* db_name, } if ((superTblInfo) - && (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) { - if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) - exit(-1); + && (superTblInfo->insertMode == REST_IFACE)) { + if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { + exit(-1); + } } + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + assert(pids != NULL); + + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); + assert(infos != NULL); + + memset(pids, 0, threads * sizeof(pthread_t)); + memset(infos, 0, threads * sizeof(threadInfo)); + for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -5786,17 +5806,32 @@ static void startMultiThreadInsertData(int threads, char* db_name, t_info->minDelay = UINT64_MAX; if ((NULL == superTblInfo) || - (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { + (superTblInfo->insertMode != REST_IFACE)) { //t_info->taos = taos; t_info->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == t_info->taos) { errorPrint( - "connect to server fail from insert sub thread, reason: %s\n", + "%s() LN%d, connect to server fail from insert sub thread, reason: %s\n", + __func__, __LINE__, taos_errstr(NULL)); + free(infos); exit(-1); } + + if ((superTblInfo) && (superTblInfo->insertMode == STMT_IFACE)) { + t_info->stmt = taos_stmt_init(t_info->taos); + if (NULL == t_info->stmt) { + errorPrint( + "%s() LN%d, failed init stmt, reason: %s\n", + __func__, __LINE__, + taos_errstr(NULL)); + free(pids); + free(infos); + exit(-1); + } + } } else { t_info->taos = NULL; } @@ -5836,6 +5871,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, threadInfo *t_info = infos + i; tsem_destroy(&(t_info->lock_sem)); + + if (t_info->stmt) { + taos_stmt_close(t_info->stmt); + } taos_close(t_info->taos); debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n", @@ -6908,7 +6947,7 @@ static void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); - tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); + g_Dbs.db[0].superTbls[0].insertMode = g_args.interface; tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; -- GitLab