未验证 提交 12228d0d 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-4073]<fix>: taosdemo rest query output to result file. (#6072)

* [TD-4073]<fix>: taosdemo rest query output to result file.

* [TD-4073]<fix>: taosdemo rest query output to result file.

fix insecure strcat().

* [TD-4073]<fix>: taosdemo rest query output to result file.

fix postSql command mistake

* [TD-4073]<fix>: taosdemo rest query output to result file.

fix append function bug.

* [TD-4073]<fix>: taosdemo rest query output to result file.

prevent potential null file passed.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 a9c59571
...@@ -76,6 +76,7 @@ enum QUERY_MODE { ...@@ -76,6 +76,7 @@ enum QUERY_MODE {
#define MAX_SQL_SIZE 65536 #define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2) #define BUFFER_SIZE (65536*2)
#define COND_BUF_LEN BUFFER_SIZE - 30
#define MAX_USERNAME_SIZE 64 #define MAX_USERNAME_SIZE 64
#define MAX_PASSWORD_SIZE 64 #define MAX_PASSWORD_SIZE 64
#define MAX_DB_NAME_SIZE 64 #define MAX_DB_NAME_SIZE 64
...@@ -522,6 +523,8 @@ static int taosRandom() ...@@ -522,6 +523,8 @@ static int taosRandom()
static int createDatabasesAndStables(); static int createDatabasesAndStables();
static void createChildTables(); static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port,
char* sqlstr, char *resultFile);
/* ************ Global variables ************ */ /* ************ Global variables ************ */
...@@ -1090,27 +1093,33 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { ...@@ -1090,27 +1093,33 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
return 0; return 0;
} }
static void getResult(TAOS_RES *res, char* resultFileName) { static void appendResultBufToFile(char *resultBuf, char *resultFile)
TAOS_ROW row = NULL; {
int num_rows = 0;
int num_fields = taos_field_count(res);
TAOS_FIELD *fields = taos_fetch_fields(res);
FILE *fp = NULL; FILE *fp = NULL;
if (resultFileName[0] != 0) { if (resultFile[0] != 0) {
fp = fopen(resultFileName, "at"); fp = fopen(resultFile, "at");
if (fp == NULL) { if (fp == NULL) {
errorPrint("%s() LN%d, failed to open result file: %s, result will not save to file\n", errorPrint(
__func__, __LINE__, resultFileName); "%s() LN%d, failed to open result file: %s, result will not save to file\n",
__func__, __LINE__, resultFile);
return;
} }
} }
fprintf(fp, "%s", resultBuf);
tmfclose(fp);
}
static void appendResultToFile(TAOS_RES *res, char* resultFile) {
TAOS_ROW row = NULL;
int num_rows = 0;
int num_fields = taos_field_count(res);
TAOS_FIELD *fields = taos_fetch_fields(res);
char* databuf = (char*) calloc(1, 100*1024*1024); char* databuf = (char*) calloc(1, 100*1024*1024);
if (databuf == NULL) { if (databuf == NULL) {
errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n", errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n",
__func__, __LINE__); __func__, __LINE__);
if (fp)
fclose(fp);
return ; return ;
} }
...@@ -1120,7 +1129,7 @@ static void getResult(TAOS_RES *res, char* resultFileName) { ...@@ -1120,7 +1129,7 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
// fetch the records row by row // fetch the records row by row
while((row = taos_fetch_row(res))) { while((row = taos_fetch_row(res))) {
if (totalLen >= 100*1024*1024 - 32000) { if (totalLen >= 100*1024*1024 - 32000) {
if (fp) fprintf(fp, "%s", databuf); appendResultBufToFile(databuf, resultFile);
totalLen = 0; totalLen = 0;
memset(databuf, 0, 100*1024*1024); memset(databuf, 0, 100*1024*1024);
} }
...@@ -1132,13 +1141,14 @@ static void getResult(TAOS_RES *res, char* resultFileName) { ...@@ -1132,13 +1141,14 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
totalLen += len; totalLen += len;
} }
if (fp) fprintf(fp, "%s", databuf); appendResultBufToFile(databuf, resultFile);
tmfclose(fp);
free(databuf); free(databuf);
} }
static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) { static void selectAndGetResult(threadInfo *pThreadInfo, char *command, char* resultFile)
TAOS_RES *res = taos_query(taos, command); {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
TAOS_RES *res = taos_query(pThreadInfo->taos, command);
if (res == NULL || taos_errno(res) != 0) { if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to execute sql:%s, reason:%s\n", errorPrint("%s() LN%d, failed to execute sql:%s, reason:%s\n",
__func__, __LINE__, command, taos_errstr(res)); __func__, __LINE__, command, taos_errstr(res));
...@@ -1146,8 +1156,24 @@ static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) ...@@ -1146,8 +1156,24 @@ static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName)
return; return;
} }
getResult(res, resultFileName); if ((resultFile) && (strlen(resultFile))) {
appendResultToFile(res, resultFile);
}
taos_free_result(res); taos_free_result(res);
} else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port,
command,
resultFile);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID);
}
} else {
errorPrint("%s() LN%d, unknown query mode: %s\n",
__func__, __LINE__, g_queryInfo.queryMode);
}
} }
static int32_t rand_bool(){ static int32_t rand_bool(){
...@@ -1940,13 +1966,13 @@ static void printfQuerySystemInfo(TAOS * taos) { ...@@ -1940,13 +1966,13 @@ static void printfQuerySystemInfo(TAOS * taos) {
// show variables // show variables
res = taos_query(taos, "show variables;"); res = taos_query(taos, "show variables;");
//getResult(res, filename); //appendResultToFile(res, filename);
xDumpResultToFile(filename, res); xDumpResultToFile(filename, res);
// show dnodes // show dnodes
res = taos_query(taos, "show dnodes;"); res = taos_query(taos, "show dnodes;");
xDumpResultToFile(filename, res); xDumpResultToFile(filename, res);
//getResult(res, filename); //appendResultToFile(res, filename);
// show databases // show databases
res = taos_query(taos, "show databases;"); res = taos_query(taos, "show databases;");
...@@ -1981,7 +2007,8 @@ static void printfQuerySystemInfo(TAOS * taos) { ...@@ -1981,7 +2007,8 @@ static void printfQuerySystemInfo(TAOS * taos) {
free(dbInfos); free(dbInfos);
} }
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, char* sqlstr) static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port,
char* sqlstr, char *resultFile)
{ {
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
...@@ -2117,6 +2144,10 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -2117,6 +2144,10 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
response_buf[RESP_BUF_LEN - 1] = '\0'; response_buf[RESP_BUF_LEN - 1] = '\0';
printf("Response:\n%s\n", response_buf); printf("Response:\n%s\n", response_buf);
if (resultFile) {
appendResultBufToFile(response_buf, resultFile);
}
free(request_buf); free(request_buf);
#ifdef WINDOWS #ifdef WINDOWS
closesocket(sockfd); closesocket(sockfd);
...@@ -4688,7 +4719,8 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) ...@@ -4688,7 +4719,8 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
} else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) { } else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) {
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, buffer)) { if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
buffer, NULL /* not set result file */)) {
affectedRows = -1; affectedRows = -1;
printf("========restful return fail, threadID[%d]\n", printf("========restful return fail, threadID[%d]\n",
pThreadInfo->threadID); pThreadInfo->threadID);
...@@ -5936,7 +5968,7 @@ static void *readMetric(void *sarg) { ...@@ -5936,7 +5968,7 @@ static void *readMetric(void *sarg) {
fprintf(fp, "Querying On %d records:\n", totalData); fprintf(fp, "Querying On %d records:\n", totalData);
for (int j = 0; j < n; j++) { for (int j = 0; j < n; j++) {
char condition[BUFFER_SIZE - 30] = "\0"; char condition[COND_BUF_LEN] = "\0";
char tempS[64] = "\0"; char tempS[64] = "\0";
int m = 10 < num_of_tables ? 10 : num_of_tables; int m = 10 < num_of_tables ? 10 : num_of_tables;
...@@ -5947,7 +5979,7 @@ static void *readMetric(void *sarg) { ...@@ -5947,7 +5979,7 @@ static void *readMetric(void *sarg) {
} else { } else {
sprintf(tempS, " or t1 = %d ", i); sprintf(tempS, " or t1 = %d ", i);
} }
strcat(condition, tempS); strncat(condition, tempS, COND_BUF_LEN - 1);
sprintf(command, "select %s from meters where %s", aggreFunc[j], condition); sprintf(command, "select %s from meters where %s", aggreFunc[j], condition);
...@@ -6125,44 +6157,25 @@ static void *specifiedTableQuery(void *sarg) { ...@@ -6125,44 +6157,25 @@ static void *specifiedTableQuery(void *sarg) {
taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms
} }
st = taosGetTimestampMs();
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
int64_t t1 = taosGetTimestampMs();
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID); pThreadInfo->threadID);
} }
selectAndGetResult(pThreadInfo->taos,
st = taosGetTimestampMs();
selectAndGetResult(pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile); g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile);
int64_t t2 = taosGetTimestampMs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %10.3f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000.0);
} else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
int64_t t1 = taosGetTimestampMs();
int retCode = postProceSql(g_queryInfo.host, &(g_queryInfo.serv_addr),
g_queryInfo.port,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID);
return NULL;
}
int64_t t2 = taosGetTimestampMs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %10.3f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000.0);
} else { et = taosGetTimestampMs();
errorPrint("%s() LN%d, unknown query mode: %s\n", printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n",
__func__, __LINE__, g_queryInfo.queryMode); taosGetSelfPthreadId(), g_queryInfo.queryMode, (et - st)/1000.0);
return NULL;
}
totalQueried ++; totalQueried ++;
g_queryInfo.specifiedQueryInfo.totalQueried ++; g_queryInfo.specifiedQueryInfo.totalQueried ++;
et = taosGetTimestampMs();
uint64_t currentPrintTime = taosGetTimestampMs(); uint64_t currentPrintTime = taosGetTimestampMs();
uint64_t endTs = taosGetTimestampMs(); uint64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
...@@ -6194,14 +6207,14 @@ static void replaceChildTblName(char* inSql, char* outSql, int tblIndex) { ...@@ -6194,14 +6207,14 @@ static void replaceChildTblName(char* inSql, char* outSql, int tblIndex) {
tstrncpy(outSql, inSql, pos - inSql + 1); tstrncpy(outSql, inSql, pos - inSql + 1);
//printf("1: %s\n", outSql); //printf("1: %s\n", outSql);
strcat(outSql, subTblName); strncat(outSql, subTblName, MAX_QUERY_SQL_LENGTH - 1);
//printf("2: %s\n", outSql); //printf("2: %s\n", outSql);
strcat(outSql, pos+strlen(sourceString)); strncat(outSql, pos+strlen(sourceString), MAX_QUERY_SQL_LENGTH - 1);
//printf("3: %s\n", outSql); //printf("3: %s\n", outSql);
} }
static void *superTableQuery(void *sarg) { static void *superTableQuery(void *sarg) {
char sqlstr[1024]; char sqlstr[MAX_QUERY_SQL_LENGTH];
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
...@@ -6246,7 +6259,7 @@ static void *superTableQuery(void *sarg) { ...@@ -6246,7 +6259,7 @@ static void *superTableQuery(void *sarg) {
g_queryInfo.superQueryInfo.result[j], g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID); pThreadInfo->threadID);
} }
selectAndGetResult(pThreadInfo->taos, sqlstr, tmpFile); selectAndGetResult(pThreadInfo, sqlstr, tmpFile);
totalQueried++; totalQueried++;
g_queryInfo.superQueryInfo.totalQueried ++; g_queryInfo.superQueryInfo.totalQueried ++;
...@@ -6447,7 +6460,8 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c ...@@ -6447,7 +6460,8 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
return; return;
} }
getResult(res, (char*)param); if (param)
appendResultToFile(res, (char*)param);
// tao_unscribe() will free result. // tao_unscribe() will free result.
} }
...@@ -6476,7 +6490,7 @@ static TAOS_SUB* subscribeImpl( ...@@ -6476,7 +6490,7 @@ static TAOS_SUB* subscribeImpl(
static void *superSubscribe(void *sarg) { static void *superSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
char subSqlstr[1024]; char subSqlstr[MAX_QUERY_SQL_LENGTH];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (g_queryInfo.superQueryInfo.sqlCount == 0) if (g_queryInfo.superQueryInfo.sqlCount == 0)
...@@ -6551,8 +6565,8 @@ static void *superSubscribe(void *sarg) { ...@@ -6551,8 +6565,8 @@ static void *superSubscribe(void *sarg) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], g_queryInfo.superQueryInfo.result[i],
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, tmpFile);
} }
getResult(res, tmpFile);
} }
} }
} }
...@@ -6639,8 +6653,8 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6639,8 +6653,8 @@ static void *specifiedSubscribe(void *sarg) {
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
appendResultToFile(res, tmpFile);
} }
getResult(res, tmpFile);
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册