提交 aafd7e1d 编写于 作者: Z zhaoyanggh

merge sml

上级 c9bf5339
......@@ -500,9 +500,8 @@ typedef struct SuperQueryInfo_S {
int resubAfterConsume;
int endAfterConsume;
TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT];
char * childTblName;
uint64_t totalQueried;
char * childTblName;
uint64_t totalQueried;
} SuperQueryInfo;
typedef struct SQueryMetaInfo_S {
......@@ -514,21 +513,18 @@ typedef struct SQueryMetaInfo_S {
char password[SHELL_MAX_PASSWORD_LEN];
char dbName[TSDB_DB_NAME_LEN];
char queryMode[SMALL_BUFF_LEN]; // taosc, rest
SpecifiedQueryInfo specifiedQueryInfo;
SuperQueryInfo superQueryInfo;
uint64_t totalQueried;
} SQueryMetaInfo;
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
int64_t * bind_ts;
int64_t *bind_ts_array;
char * bindParams;
char * is_null;
TAOS * taos;
TAOS_STMT * stmt;
int64_t * bind_ts;
int64_t * bind_ts_array;
char * bindParams;
char * is_null;
int threadID;
char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision;
......@@ -588,54 +584,61 @@ extern FILE * g_fpOfInsertResult;
#define min(a, b) (((a) < (b)) ? (a) : (b))
/* ************ Function declares ************ */
int parse_args(int argc, char *argv[]);
int getInfoFromJsonFile(char *file);
/* demoCommandOpt.c */
int parse_args(int argc, char *argv[]);
void setParaFromArg();
void querySqlFile(TAOS *taos, char *sqlFile);
void testCmdLine();
/* demoJsonOpt.c */
int getInfoFromJsonFile(char *file);
int testMetaFile();
/* demoUtil.c */
int isCommentLine(char *line);
void replaceChildTblName(char *inSql, char *outSql, int tblIndex);
void setupForAnsiEscape(void);
void resetAfterAnsiEscape(void);
int taosRandom();
int testMetaFile();
int insertTestProcess();
void printfInsertMeta();
void printfInsertMetaToFile(FILE *fp);
void prompt();
void postFreeResource();
void setParaFromArg();
void printStatPerThread(threadInfo *pThreadInfo);
void tmfree(void *buf);
void tmfclose(FILE *fp);
void fetchResult(TAOS_RES *res, threadInfo *pThreadInfo);
void prompt();
void ERROR_EXIT(const char *msg);
int postProceSql(char *host, uint16_t port, char *sqlstr,
threadInfo *pThreadInfo);
void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo);
int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
int regexMatch(const char *s, const char *reg, int cflags);
int convertHostToServAddr(char *host, uint16_t port,
struct sockaddr_in *serv_addr);
char *formatTimestamp(char *buf, int64_t val, int precision);
int getChildNameOfSuperTableWithLimitAndOffset(TAOS *taos, char *dbName,
char * stbName,
char ** childTblNameOfSuperTbl,
int64_t *childTblCountOfSuperTbl,
int64_t limit, uint64_t offset,
bool escapChar);
void errorWrongValue(char *program, char *wrong_arg, char *wrong_value);
void errorUnrecognized(char *program, char *wrong_arg);
void errorPrintReqArg(char *program, char *wrong_arg);
void errorPrintReqArg2(char *program, char *wrong_arg);
void errorPrintReqArg3(char *program, char *wrong_arg);
bool isStringNumber(char *input);
void printHelp();
int queryTestProcess();
void ERROR_EXIT(const char *msg);
void querySqlFile(TAOS *taos, char *sqlFile);
void printVersion();
int subscribeTestProcess();
void testCmdLine();
int isCommentLine(char *line);
void replaceChildTblName(char *inSql, char *outSql, int tblIndex);
void printfQueryMeta();
int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
char ** childTblNameOfSuperTbl,
int64_t *childTblCountOfSuperTbl);
void resetAfterAnsiEscape(void);
void printfQuerySystemInfo(TAOS *taos);
int getChildNameOfSuperTableWithLimitAndOffset(TAOS *taos, char *dbName,
char * stbName,
char ** childTblNameOfSuperTbl,
int64_t *childTblCountOfSuperTbl,
int64_t limit, uint64_t offset,
bool escapChar);
/* demoInsert.c */
int insertTestProcess();
void postFreeResource();
/* demoOutput.c */
void printVersion();
void printfInsertMeta();
void printfInsertMetaToFile(FILE *fp);
void printStatPerThread(threadInfo *pThreadInfo);
void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo);
void printfQueryMeta();
void printHelp();
void printfQuerySystemInfo(TAOS *taos);
/* demoQuery.c */
int queryTestProcess();
/* demoSubscribe.c */
int subscribeTestProcess();
#endif
\ No newline at end of file
......@@ -15,6 +15,7 @@
#ifndef __DEMODATA__
#define __DEMODATA__
#include "cJSON.h"
#include "demo.h"
/***** Global variables ******/
......@@ -71,4 +72,52 @@ float UNUSED_FUNC demo_phase_float();
void rand_string(char *str, int size);
char * rand_double_str();
double rand_double();
int generateTagValuesForStb(SSuperTable *stbInfo, int64_t tableSeq,
char *tagsValBuf);
int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio,
int disorderRange);
int32_t prepareStbStmtBindTag(char *bindArray, SSuperTable *stbInfo,
char *tagsVal, int32_t timePrec);
int32_t prepareStmtWithoutStb(threadInfo *pThreadInfo, char *tableName,
uint32_t batch, int64_t insertRows,
int64_t recordFrom, int64_t startTime);
int32_t generateStbInterlaceData(threadInfo *pThreadInfo, char *tableName,
uint32_t batchPerTbl, uint64_t i,
uint32_t batchPerTblTimes, uint64_t tableSeq,
char *buffer, int64_t insertRows,
int64_t startTime, uint64_t *pRemainderBufLen);
int64_t generateInterlaceDataWithoutStb(char *tableName, uint32_t batch,
uint64_t tableSeq, char *dbName,
char *buffer, int64_t insertRows,
int64_t startTime,
uint64_t *pRemainderBufLen);
int32_t generateStbProgressiveData(SSuperTable *stbInfo, char *tableName,
int64_t tableSeq, char *dbName, char *buffer,
int64_t insertRows, uint64_t recordFrom,
int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen);
int32_t generateProgressiveDataWithoutStb(
char *tableName, threadInfo *pThreadInfo, char *buffer, int64_t insertRows,
uint64_t recordFrom, int64_t startTime, int64_t *pRemainderBufLen);
int64_t generateStbRowData(SSuperTable *stbInfo, char *recBuf,
int64_t remainderBufLen, int64_t timestamp);
int prepareSampleForStb(SSuperTable *stbInfo);
int prepareSampleForNtb();
int parseSamplefileToStmtBatch(SSuperTable *stbInfo);
int parseStbSampleToStmtBatchForThread(threadInfo * pThreadInfo,
SSuperTable *stbInfo, uint32_t timePrec,
uint32_t batch);
int parseNtbSampleToStmtBatchForThread(threadInfo *pThreadInfo,
uint32_t timePrec, uint32_t batch);
int prepareSampleData();
int32_t generateSmlConstPart(char *sml, SSuperTable *stbInfo,
threadInfo *pThreadInfo, int tbSeq);
int32_t generateSmlMutablePart(char *line, char *sml, SSuperTable *stbInfo,
threadInfo *pThreadInfo, int64_t timestamp);
int32_t generateSmlJsonTags(cJSON *tagsList, SSuperTable *stbInfo,
threadInfo *pThreadInfo, int tbSeq);
int32_t generateSmlJsonCols(cJSON *array, cJSON *tag, SSuperTable *stbInfo,
threadInfo *pThreadInfo, int64_t timestamp);
#endif
\ No newline at end of file
......@@ -21,7 +21,7 @@
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"keep": 36500,
"minRows": 100,
"maxRows": 4096,
"comp":2,
......
......@@ -458,12 +458,12 @@ int parse_args(int argc, char *argv[]) {
errorPrintReqArg2(argv[0], "S");
goto end_parse_command;
}
g_args.async_mode = atoi(argv[++i]);
g_args.timestamp_step = atoi(argv[++i]);
} else if (0 == strncmp(argv[i],
"--time-step=", strlen("--time-step="))) {
if (isStringNumber(
(char *)(argv[i] + strlen("--time-step=")))) {
g_args.async_mode =
g_args.timestamp_step =
atoi((char *)(argv[i] + strlen("--time-step=")));
} else {
errorPrintReqArg2(argv[0], "--time-step");
......@@ -471,7 +471,8 @@ int parse_args(int argc, char *argv[]) {
}
} else if (0 == strncmp(argv[i], "-S", strlen("-S"))) {
if (isStringNumber((char *)(argv[i] + strlen("-S")))) {
g_args.async_mode = atoi((char *)(argv[i] + strlen("-S")));
g_args.timestamp_step =
atoi((char *)(argv[i] + strlen("-S")));
} else {
errorPrintReqArg2(argv[0], "-S");
goto end_parse_command;
......@@ -484,7 +485,7 @@ int parse_args(int argc, char *argv[]) {
errorPrintReqArg2(argv[0], "--time-step");
goto end_parse_command;
}
g_args.async_mode = atoi(argv[++i]);
g_args.timestamp_step = atoi(argv[++i]);
} else {
errorUnrecognized(argv[0], argv[i]);
goto end_parse_command;
......@@ -1142,11 +1143,9 @@ int parse_args(int argc, char *argv[]) {
} else if ((strcmp(argv[i], "--version") == 0) ||
(strcmp(argv[i], "-V") == 0)) {
printVersion();
return 0;
} else if ((strcmp(argv[i], "--help") == 0) ||
(strcmp(argv[i], "-?") == 0)) {
printHelp();
return 0;
} else if (strcmp(argv[i], "--usage") == 0) {
printf(
" Usage: taosdemo [-f JSONFILE] [-u USER] [-p PASSWORD] [-c CONFIG_DIR]\n\
......@@ -1156,7 +1155,7 @@ int parse_args(int argc, char *argv[]) {
[-i SLEEPTIME] [-S TIME_STEP] [-B INTERLACE_ROWS] [-t TABLES]\n\
[-n RECORDS] [-M] [-x] [-y] [-O ORDERMODE] [-R RANGE] [-a REPLIcA][-g]\n\
[--help] [--usage] [--version]\n");
return 0;
exit(EXIT_SUCCESS);
} else {
// to simulate argp_option output
if (strlen(argv[i]) > 2) {
......@@ -1374,6 +1373,8 @@ void setParaFromArg() {
} else {
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
}
g_Dbs.db[0].superTbls[0].lineProtocol = TSDB_SML_LINE_PROTOCOL;
g_Dbs.db[0].superTbls[0].tsPrecision = TSDB_SML_TIMESTAMP_MILLI_SECONDS;
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = g_args.timestamp_step;
......
此差异已折叠。
此差异已折叠。
......@@ -20,34 +20,35 @@ FILE * g_fpOfInsertResult = NULL;
char * g_dupstr = NULL;
SDbs g_Dbs;
SQueryMetaInfo g_queryInfo;
SArguments g_args = {
DEFAULT_METAFILE, // metaFile
DEFAULT_TEST_MODE, // test_mode
DEFAULT_HOST, // host
DEFAULT_PORT, // port
DEFAULT_IFACE, // iface
TSDB_DEFAULT_USER, // user
TSDB_DEFAULT_PASS, // password
DEFAULT_DATABASE, // database
DEFAULT_REPLICA, // replica
DEFAULT_TB_PREFIX, // tb_prefix
DEFAULT_ESCAPE_CHAR, // escapeChar
DEFAULT_SQLFILE, // sqlFile
DEFAULT_USE_METRIC, // use_metric
DEFAULT_DROP_DB, // drop_database
DEFAULT_AGGR_FUNC, // aggr_func
DEFAULT_DEBUG, // debug_print
DEFAULT_VERBOSE, // verbose_print
DEFAULT_PERF_STAT, // performance statistic print
DEFAULT_ANS_YES, // answer_yes;
DEFAULT_OUTPUT, // output_file
DEFAULT_SYNC_MODE, // mode : sync or async
DEFAULT_DATA_TYPE, // data_type
DEFAULT_DATATYPE, // dataType
DEFAULT_BINWIDTH, // binwidth
DEFAULT_COL_COUNT, // columnCount, timestamp + float + int + float
DEFAULT_LEN_ONE_ROW, // lenOfOneRow
DEFAULT_NTHREADS, // nthreads
SArguments g_args = {
DEFAULT_METAFILE, // metaFile
DEFAULT_TEST_MODE, // test_mode
DEFAULT_HOST, // host
DEFAULT_PORT, // port
DEFAULT_IFACE, // iface
TSDB_DEFAULT_USER, // user
TSDB_DEFAULT_PASS, // password
DEFAULT_DATABASE, // database
DEFAULT_REPLICA, // replica
DEFAULT_TB_PREFIX, // tb_prefix
DEFAULT_ESCAPE_CHAR, // escapeChar
DEFAULT_SQLFILE, // sqlFile
DEFAULT_USE_METRIC, // use_metric
DEFAULT_DROP_DB, // drop_database
DEFAULT_AGGR_FUNC, // aggr_func
DEFAULT_DEBUG, // debug_print
DEFAULT_VERBOSE, // verbose_print
DEFAULT_PERF_STAT, // performance statistic print
DEFAULT_ANS_YES, // answer_yes;
DEFAULT_OUTPUT, // output_file
DEFAULT_SYNC_MODE, // mode : sync or async
DEFAULT_DATA_TYPE, // data_type
DEFAULT_DATATYPE, // dataType
DEFAULT_BINWIDTH, // binwidth
DEFAULT_COL_COUNT, // columnCount, timestamp + float + int + float
DEFAULT_LEN_ONE_ROW, // lenOfOneRow
DEFAULT_NTHREADS, // nthreads
DEFAULT_INSERT_INTERVAL, // insert_interval
DEFAULT_TIMESTAMP_STEP, // timestamp_step
DEFAULT_QUERY_TIME, // query_times
......
......@@ -301,7 +301,8 @@ void printHelp() {
printf("%s%s%s%s\n", indent, "-P, --port=PORT", "\t\t",
"The TCP/IP port number to use for the connection.");
printf("%s%s%s%s\n", indent, "-I, --interface=INTERFACE", "\t",
"The interface (taosc, rest, and stmt) taosdemo uses. By default "
"The interface (taosc, rest, stmt, and sml(line protocol)) taosdemo "
"uses. By default "
"use 'taosc'.");
printf("%s%s%s%s\n", indent, "-d, --database=DATABASE", "\t",
"Destination database. By default is 'test'.");
......@@ -374,6 +375,7 @@ void printHelp() {
for any corresponding short options.\n\
\n\
Report bugs to <support@taosdata.com>.\n");
exit(EXIT_SUCCESS);
}
void printfInsertMeta() {
......@@ -531,6 +533,17 @@ void printfInsertMeta() {
: (g_Dbs.db[i].superTbls[j].iface == STMT_IFACE)
? "stmt"
: "sml");
if (g_Dbs.db[i].superTbls[j].iface == SML_IFACE) {
printf(" lineProtocol: \033[33m%s\033[0m\n",
(g_Dbs.db[i].superTbls[j].lineProtocol ==
TSDB_SML_LINE_PROTOCOL)
? "line"
: (g_Dbs.db[i].superTbls[j].lineProtocol ==
TSDB_SML_TELNET_PROTOCOL)
? "telnet"
: "json");
}
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
printf(" childTblLimit: \033[33m%" PRId64
"\033[0m\n",
......@@ -1009,4 +1022,31 @@ void printfQuerySystemInfo(TAOS *taos) {
free(dbInfos);
resetAfterAnsiEscape();
}
void printStatPerThread(threadInfo *pThreadInfo) {
if (0 == pThreadInfo->totalDelay) pThreadInfo->totalDelay = 1;
fprintf(stderr,
"====thread[%d] completed total inserted rows: %" PRIu64
", total affected rows: %" PRIu64 ". %.2f records/second====\n",
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows,
(double)(pThreadInfo->totalAffectedRows /
((double)pThreadInfo->totalDelay / 1000000.0)));
}
void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo) {
pThreadInfo->fp = fopen(pThreadInfo->filePath, "at");
if (pThreadInfo->fp == NULL) {
errorPrint(
"%s() LN%d, failed to open result file: %s, result will not save "
"to file\n",
__func__, __LINE__, pThreadInfo->filePath);
return;
}
fprintf(pThreadInfo->fp, "%s", resultBuf);
tmfclose(pThreadInfo->fp);
pThreadInfo->fp = NULL;
}
\ No newline at end of file
......@@ -282,18 +282,6 @@ int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
-1, 0, false);
}
void printStatPerThread(threadInfo *pThreadInfo) {
if (0 == pThreadInfo->totalDelay) pThreadInfo->totalDelay = 1;
fprintf(stderr,
"====thread[%d] completed total inserted rows: %" PRIu64
", total affected rows: %" PRIu64 ". %.2f records/second====\n",
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows,
(double)(pThreadInfo->totalAffectedRows /
((double)pThreadInfo->totalDelay / 1000000.0)));
}
int convertHostToServAddr(char *host, uint16_t port,
struct sockaddr_in *serv_addr) {
uint16_t rest_port = port + TSDB_PORT_HTTP;
......@@ -406,21 +394,6 @@ int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
return 0;
}
void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo) {
pThreadInfo->fp = fopen(pThreadInfo->filePath, "at");
if (pThreadInfo->fp == NULL) {
errorPrint(
"%s() LN%d, failed to open result file: %s, result will not save "
"to file\n",
__func__, __LINE__, pThreadInfo->filePath);
return;
}
fprintf(pThreadInfo->fp, "%s", resultBuf);
tmfclose(pThreadInfo->fp);
pThreadInfo->fp = NULL;
}
int postProceSql(char *host, uint16_t port, char *sqlstr,
threadInfo *pThreadInfo) {
char *req_fmt =
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册