提交 85176949 编写于 作者: S Shuduo Sang

[TD-4068]<feature>: taosdemo support stmt interface.

construct framework.
上级 d0aaeced
......@@ -121,9 +121,9 @@ enum MODE {
};
enum INTERFACE {
TAOSC_INTERFACE,
REST_INTERFACE,
STMT_INTERFACE,
TAOSC_IFACE,
REST_IFACE,
STMT_IFACE,
INTERFACE_BUT
};
......@@ -131,7 +131,7 @@ typedef enum enum_INSERT_MODE {
PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE,
INVALID_INSERT_MODE
} INSERT_MODE;
} PROG_OR_INTERLACE_MODE;
typedef enum enumQUERY_TYPE {
NO_INSERT_TYPE,
......@@ -246,7 +246,7 @@ typedef struct SSuperTable_S {
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t insertMode; // 0: taosc, 1: rest, 2: stmt
int64_t childTblLimit;
uint64_t childTblOffset;
......@@ -291,7 +291,7 @@ typedef struct SSuperTable_S {
typedef struct {
char name[TSDB_DB_NAME_LEN + 1];
char create_time[32];
int32_t ntables;
uint64_t ntables;
int32_t vgroups;
int16_t replica;
int16_t quorum;
......@@ -413,6 +413,7 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
int threadID;
char db_name[MAX_DB_NAME_SIZE+1];
uint32_t time_precision;
......@@ -544,7 +545,7 @@ SArguments g_args = {
0, // test_mode
"127.0.0.1", // host
6030, // port
TAOSC_INTERFACE, // interface
TAOSC_IFACE, // interface
"root", // user
#ifdef _TD_POWER_
"powerdb", // password
......@@ -759,11 +760,11 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
++i;
if (0 == strcasecmp(argv[i], "taosc")) {
arguments->interface = TAOSC_INTERFACE;
arguments->interface = TAOSC_IFACE;
} else if (0 == strcasecmp(argv[i], "rest")) {
arguments->interface = REST_INTERFACE;
arguments->interface = REST_IFACE;
} else if (0 == strcasecmp(argv[i], "stmt")) {
arguments->interface = STMT_INTERFACE;
arguments->interface = STMT_IFACE;
} else {
errorPrint("%s", "\n\t-I need a valid string following!\n");
exit(EXIT_FAILURE);
......@@ -1025,7 +1026,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->port );
printf("# User: %s\n", arguments->user);
printf("# Password: %s\n", arguments->password);
printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false");
printf("# Use metric: %s\n",
arguments->use_metric ? "true" : "false");
if (*(arguments->datatype)) {
printf("# Specified data type: ");
for (int i = 0; i < MAX_NUM_DATATYPE; i++)
......@@ -1319,6 +1321,8 @@ static void init_rand_data() {
static int printfInsertMeta() {
SHOW_PARSE_RESULT_START();
printf("interface: \033[33m%s\033[0m\n",
(g_args.interface==TAOSC_IFACE)?"taosc":(g_args.interface==REST_IFACE)?"rest":"stmt");
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
......@@ -1423,7 +1427,8 @@ static int printfInsertMeta() {
printf(" dataSource: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].insertMode);
(g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt");
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n",
g_Dbs.db[i].superTbls[j].childTblLimit);
......@@ -1606,7 +1611,8 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " dataSource: %s\n",
g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n",
g_Dbs.db[i].superTbls[j].insertMode);
(g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt");
fprintf(fp, " insertRows: %"PRIu64"\n",
g_Dbs.db[i].superTbls[j].insertRows);
fprintf(fp, " interlace rows: %"PRIu64"\n",
......@@ -2916,7 +2922,7 @@ static int startMultiThreadCreateChildTable(
char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
threadInfo *infos = malloc(threads * sizeof(threadInfo));
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed\n");
......@@ -3110,10 +3116,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
return 0;
}
#if 0
int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
// TODO
return 0;
}
#endif
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
......@@ -3813,15 +3821,24 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
if (insertMode && insertMode->type == cJSON_String
&& insertMode->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode,
insertMode->valuestring, MAX_DB_NAME_SIZE);
if (0 == strcasecmp(insertMode->valuestring, "taosc")) {
g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE;
} else if (0 == strcasecmp(insertMode->valuestring, "rest")) {
g_Dbs.db[i].superTbls[j].insertMode = REST_IFACE;
} else if (0 == strcasecmp(insertMode->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].insertMode = STMT_IFACE;
} else {
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
__func__, __LINE__, insertMode->valuestring);
goto PARSE_OVER;
}
} else if (!insertMode) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE);
g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE;
} else {
printf("ERROR: failed to read json, insert_mode not found\n");
errorPrint("%s", "failed to read json, insert_mode not found\n");
goto PARSE_OVER;
}
......@@ -4751,9 +4768,9 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, buffer);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
if (superTblInfo->insertMode == TAOSC_IFACE) {
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
} else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) {
} else if (superTblInfo->insertMode == REST_IFACE) {
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
buffer, NULL /* not set result file */)) {
affectedRows = -1;
......@@ -4762,8 +4779,13 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
} else {
affectedRows = k;
}
} else if (superTblInfo->insertMode == STMT_IFACE) {
// TODO: add stmt support
errorPrint("%s() LN%d, %s\n",
__func__, __LINE__, "!!! need support stmt here");
exit(-1);
} else {
errorPrint("%s() LN%d: unknown insert mode: %s\n",
errorPrint("%s() LN%d: unknown insert mode: %d\n",
__func__, __LINE__, superTblInfo->insertMode);
affectedRows = 0;
}
......@@ -4800,7 +4822,7 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t table
static int64_t generateDataTail(
SSuperTable* superTblInfo,
uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows,
int64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) {
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) {
uint64_t len = 0;
uint32_t ncols_per_record = 1; // count first col ts
......@@ -5114,17 +5136,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR;
int insertMode;
int progOrInterlace;
if (interlaceRows > 0) {
insertMode = INTERLACE_INSERT_MODE;
progOrInterlace= INTERLACE_INSERT_MODE;
} else {
insertMode = PROGRESSIVE_INSERT_MODE;
progOrInterlace = PROGRESSIVE_INSERT_MODE;
}
// TODO: prompt tbl count multple interlace rows and batch
//
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) {
......@@ -5230,7 +5249,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
if (insertMode == INTERLACE_INSERT_MODE) {
if (progOrInterlace == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
......@@ -5609,15 +5628,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
static void startMultiThreadInsertData(int threads, char* db_name,
char* precision,SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = malloc(threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
//TAOS* taos;
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
......@@ -5678,17 +5688,17 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
}
TAOS* taos = taos_connect(
TAOS* taos0 = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL));
exit(-1);
}
int ntables = 0;
int startFrom;
uint64_t ntables = 0;
uint startFrom;
if (superTblInfo) {
int64_t limit;
......@@ -5740,13 +5750,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos);
taos_close(taos0);
exit(-1);
}
uint64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos,
taos0,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
limit,
......@@ -5756,7 +5766,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
startFrom = 0;
}
taos_close(taos);
taos_close(taos0);
uint64_t a = ntables / threads;
if (a < 1) {
......@@ -5770,10 +5780,20 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
if ((superTblInfo)
&& (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) {
if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0)
&& (superTblInfo->insertMode == REST_IFACE)) {
if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1);
}
}
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
......@@ -5786,17 +5806,32 @@ static void startMultiThreadInsertData(int threads, char* db_name,
t_info->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) {
(superTblInfo->insertMode != REST_IFACE)) {
//t_info->taos = taos;
t_info->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
errorPrint(
"connect to server fail from insert sub thread, reason: %s\n",
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(infos);
exit(-1);
}
if ((superTblInfo) && (superTblInfo->insertMode == STMT_IFACE)) {
t_info->stmt = taos_stmt_init(t_info->taos);
if (NULL == t_info->stmt) {
errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
}
} else {
t_info->taos = NULL;
}
......@@ -5836,6 +5871,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
threadInfo *t_info = infos + i;
tsem_destroy(&(t_info->lock_sem));
if (t_info->stmt) {
taos_stmt_close(t_info->stmt);
}
taos_close(t_info->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
......@@ -6908,7 +6947,7 @@ static void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].insertMode = g_args.interface;
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册