未验证 提交 dfb29f75 编写于 作者: S slguan 提交者: GitHub

Merge branch 'develop' into beta/v1.6.5.3

...@@ -97,6 +97,10 @@ go build -o bin/taosimport app/main.go ...@@ -97,6 +97,10 @@ go build -o bin/taosimport app/main.go
是否保存统计信息到 tdengine 的 statistic 表中,1 是,0 否, 默认 0。 是否保存统计信息到 tdengine 的 statistic 表中,1 是,0 否, 默认 0。
* -savetb int
当 save 为 1 时保存统计信息的表名, 默认 statistic。
* -auto int * -auto int
是否自动生成样例数据中的主键时间戳,1 是,0 否, 默认 0。 是否自动生成样例数据中的主键时间戳,1 是,0 否, 默认 0。
......
...@@ -28,6 +28,7 @@ const ( ...@@ -28,6 +28,7 @@ const (
DEFAULT_STARTTIME int64 = -1 DEFAULT_STARTTIME int64 = -1
DEFAULT_INTERVAL int64 = 1*1000 DEFAULT_INTERVAL int64 = 1*1000
DEFAULT_DELAY int64 = -1 DEFAULT_DELAY int64 = -1
DEFAULT_STATISTIC_TABLE = "statistic"
JSON_FORMAT = "json" JSON_FORMAT = "json"
CSV_FORMAT = "csv" CSV_FORMAT = "csv"
...@@ -37,7 +38,6 @@ const ( ...@@ -37,7 +38,6 @@ const (
DRIVER_NAME = "taosSql" DRIVER_NAME = "taosSql"
STARTTIME_LAYOUT = "2006-01-02 15:04:05.000" STARTTIME_LAYOUT = "2006-01-02 15:04:05.000"
INSERT_PREFIX = "insert into " INSERT_PREFIX = "insert into "
STATISTIC_TABLE = "statistic"
) )
var ( var (
...@@ -75,6 +75,7 @@ var ( ...@@ -75,6 +75,7 @@ var (
delay int64 // default 10 milliseconds delay int64 // default 10 milliseconds
tick int64 tick int64
save int save int
saveTable string
) )
type superTableConfig struct { type superTableConfig struct {
...@@ -278,9 +279,9 @@ func staticSpeed(){ ...@@ -278,9 +279,9 @@ func staticSpeed(){
if save == 1 { if save == 1 {
connection.Exec("use " + db) connection.Exec("use " + db)
_, err := connection.Exec("create table if not exists " + STATISTIC_TABLE +"(ts timestamp, speed int)") _, err := connection.Exec("create table if not exists " + saveTable +"(ts timestamp, speed int)")
if err != nil { if err != nil {
log.Fatalf("create %s Table error: %s\n", STATISTIC_TABLE, err) log.Fatalf("create %s Table error: %s\n", saveTable, err)
} }
} }
...@@ -297,7 +298,7 @@ func staticSpeed(){ ...@@ -297,7 +298,7 @@ func staticSpeed(){
log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed) log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed)
if save == 1 { if save == 1 {
insertSql := fmt.Sprintf("insert into %s values(%d, %d)", STATISTIC_TABLE, currentTime.UnixNano()/1e6, speed) insertSql := fmt.Sprintf("insert into %s values(%d, %d)", saveTable, currentTime.UnixNano()/1e6, speed)
connection.Exec(insertSql) connection.Exec(insertSql)
} }
...@@ -353,7 +354,7 @@ func createStatisticTable(){ ...@@ -353,7 +354,7 @@ func createStatisticTable(){
connection := getConnection() connection := getConnection()
defer connection.Close() defer connection.Close()
_, err := connection.Exec("create table if not exist " + db + "."+ STATISTIC_TABLE +"(ts timestamp, speed int)") _, err := connection.Exec("create table if not exist " + db + "."+ saveTable +"(ts timestamp, speed int)")
if err != nil { if err != nil {
log.Fatalf("createStatisticTable error: %s\n", err) log.Fatalf("createStatisticTable error: %s\n", err)
} }
...@@ -1037,6 +1038,7 @@ func parseArg() { ...@@ -1037,6 +1038,7 @@ func parseArg() {
flag.Int64Var(&delay, "delay", DEFAULT_DELAY, "the delay time interval(millisecond) to continue generating data when vnum set 0.") flag.Int64Var(&delay, "delay", DEFAULT_DELAY, "the delay time interval(millisecond) to continue generating data when vnum set 0.")
flag.Int64Var(&tick, "tick", 2000, "the tick time interval(millisecond) to print statistic info.") flag.Int64Var(&tick, "tick", 2000, "the tick time interval(millisecond) to print statistic info.")
flag.IntVar(&save, "save", 0, "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled.") flag.IntVar(&save, "save", 0, "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled.")
flag.StringVar(&saveTable, "savetb", DEFAULT_STATISTIC_TABLE, "the table to save 'statistic' info when save set 1.")
flag.IntVar(&thread, "thread", 10, "number of threads to import data.") flag.IntVar(&thread, "thread", 10, "number of threads to import data.")
flag.IntVar(&batch, "batch", 100, "rows of records in one import batch.") flag.IntVar(&batch, "batch", 100, "rows of records in one import batch.")
flag.IntVar(&auto, "auto", 0, "whether to use the starttime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.") flag.IntVar(&auto, "auto", 0, "whether to use the starttime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.")
...@@ -1062,6 +1064,7 @@ func printArg() { ...@@ -1062,6 +1064,7 @@ func printArg() {
fmt.Println("-delay:", delay) fmt.Println("-delay:", delay)
fmt.Println("-tick:", tick) fmt.Println("-tick:", tick)
fmt.Println("-save:", save) fmt.Println("-save:", save)
fmt.Println("-savetb:", saveTable)
fmt.Println("-thread:", thread) fmt.Println("-thread:", thread)
fmt.Println("-batch:", batch) fmt.Println("-batch:", batch)
fmt.Println("-auto:", auto) fmt.Println("-auto:", auto)
......
...@@ -276,7 +276,8 @@ typedef struct { ...@@ -276,7 +276,8 @@ typedef struct {
bool existsCheck; // check if the table exists bool existsCheck; // check if the table exists
int8_t showType; // show command type int8_t showType; // show command type
}; };
int8_t isParseFinish;
int8_t isInsertFromFile; // load data from file or not int8_t isInsertFromFile; // load data from file or not
bool import; // import/insert type bool import; // import/insert type
uint8_t msgType; uint8_t msgType;
......
...@@ -507,8 +507,21 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -507,8 +507,21 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} else { // normal async query continues } else { // normal async query continues
code = tsParseSql(pSql, pObj->acctId, pObj->db, false); if (pCmd->isParseFinish) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
assert(code == TSDB_CODE_SUCCESS);
if (pMeterMetaInfo->pMeterMeta) {
code = tscSendMsgToServer(pSql);
if (code == TSDB_CODE_SUCCESS) return;
}
} else {
code = tsParseSql(pSql, pObj->acctId, pObj->db, false);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
}
} }
} else { // stream computing } else { // stream computing
......
...@@ -975,7 +975,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -975,7 +975,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
str = pSql->asyncTblPos; str = pSql->asyncTblPos;
} }
tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks); tscTrace("%p create data block list for submit data:%p, asyncTblPos:%p, pTableHashList:%p", pSql, pSql->cmd.pDataBlocks, pSql->asyncTblPos, pSql->pTableHashList);
while (1) { while (1) {
int32_t index = 0; int32_t index = 0;
...@@ -1223,6 +1223,7 @@ _clean: ...@@ -1223,6 +1223,7 @@ _clean:
taosCleanUpIntHash(pSql->pTableHashList); taosCleanUpIntHash(pSql->pTableHashList);
pSql->pTableHashList = NULL; pSql->pTableHashList = NULL;
pSql->asyncTblPos = NULL; pSql->asyncTblPos = NULL;
pCmd->isParseFinish = 1;
return code; return code;
} }
......
...@@ -1108,9 +1108,9 @@ void tsSetAllDebugFlag() { ...@@ -1108,9 +1108,9 @@ void tsSetAllDebugFlag() {
* In case that the setLocale failed to be executed, the right charset needs to be set. * In case that the setLocale failed to be executed, the right charset needs to be set.
*/ */
void tsSetLocale() { void tsSetLocale() {
char msgLocale[] = "Invalid locale:%s, please set the valid locale in config file"; char msgLocale[] = "Invalid locale:%s, please set the valid locale in config file\n";
char msgCharset[] = "Invalid charset:%s, please set the valid charset in config file"; char msgCharset[] = "Invalid charset:%s, please set the valid charset in config file\n";
char msgCharset1[] = "failed to get charset, please set the valid charset in config file"; char msgCharset1[] = "failed to get charset, please set the valid charset in config file\n";
char *locale = setlocale(LC_CTYPE, tsLocale); char *locale = setlocale(LC_CTYPE, tsLocale);
......
...@@ -45,6 +45,7 @@ typedef struct { ...@@ -45,6 +45,7 @@ typedef struct {
uint32_t uDebugFlag = 131; // all the messages uint32_t uDebugFlag = 131; // all the messages
short tsAsyncLog = 1; short tsAsyncLog = 1;
static pid_t logPid = 0;
static SLogBuff *logHandle = NULL; static SLogBuff *logHandle = NULL;
static int taosLogFileNum = 1; static int taosLogFileNum = 1;
static int taosLogMaxLines = 0; static int taosLogMaxLines = 0;
...@@ -82,6 +83,11 @@ int taosStartLog() { ...@@ -82,6 +83,11 @@ int taosStartLog() {
} }
int taosInitLog(char *logName, int numOfLogLines, int maxFiles) { int taosInitLog(char *logName, int numOfLogLines, int maxFiles) {
#ifdef LINUX
logPid = (pid_t)syscall(SYS_gettid);
#endif
logHandle = taosLogBuffNew(TSDB_DEFAULT_LOG_BUF_SIZE); logHandle = taosLogBuffNew(TSDB_DEFAULT_LOG_BUF_SIZE);
if (logHandle == NULL) return -1; if (logHandle == NULL) return -1;
...@@ -306,8 +312,8 @@ char *tprefix(char *prefix) { ...@@ -306,8 +312,8 @@ char *tprefix(char *prefix) {
sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId()); ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId());
#else #else
sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d 0x%lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d %d 0x%lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self()); ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self());
#endif #endif
return prefix; return prefix;
} }
...@@ -333,8 +339,8 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...) ...@@ -333,8 +339,8 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId()); ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId());
#else #else
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self()); ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self());
#endif #endif
len += sprintf(buffer + len, "%s", flags); len += sprintf(buffer + len, "%s", flags);
...@@ -424,8 +430,8 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f ...@@ -424,8 +430,8 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId()); ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId());
#else #else
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self()); ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self());
#endif #endif
len += sprintf(buffer + len, "%s", flags); len += sprintf(buffer + len, "%s", flags);
......
...@@ -162,12 +162,13 @@ void* insert_rows(void *sarg) ...@@ -162,12 +162,13 @@ void* insert_rows(void *sarg)
} }
// insert data // insert data
int index = 0; int64_t begin = (int64_t)time(NULL);
int index = 0;
while (1) { while (1) {
if (g_thread_exit_flag) break; if (g_thread_exit_flag) break;
index++; index++;
sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, 1546300800000+index*1000, index); sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, (begin + index) * 1000, index);
if (taos_query(taos, command)) { if (taos_query(taos, command)) {
printf("failed to insert row [%s], reason:%s\n", command, taos_errstr(taos)); printf("failed to insert row [%s], reason:%s\n", command, taos_errstr(taos));
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册