diff --git a/importSampleData/README.md b/importSampleData/README.md index 0678676d4e85d568068dfa138904baf7a8ef03e5..ee3a6e073c18b618af49a9c0b6d2d6d07718f00f 100644 --- a/importSampleData/README.md +++ b/importSampleData/README.md @@ -97,6 +97,10 @@ go build -o bin/taosimport app/main.go 是否保存统计信息到 tdengine 的 statistic 表中,1 是,0 否, 默认 0。 +* -savetb int + + 当 save 为 1 时保存统计信息的表名, 默认 statistic。 + * -auto int 是否自动生成样例数据中的主键时间戳,1 是,0 否, 默认 0。 diff --git a/importSampleData/app/main.go b/importSampleData/app/main.go index 087b7bb7614e8a03da0ce9fae0c5693340314236..aef413320710012fec79e56677e16864a881ca8f 100644 --- a/importSampleData/app/main.go +++ b/importSampleData/app/main.go @@ -28,6 +28,7 @@ const ( DEFAULT_STARTTIME int64 = -1 DEFAULT_INTERVAL int64 = 1*1000 DEFAULT_DELAY int64 = -1 + DEFAULT_STATISTIC_TABLE = "statistic" JSON_FORMAT = "json" CSV_FORMAT = "csv" @@ -37,7 +38,6 @@ const ( DRIVER_NAME = "taosSql" STARTTIME_LAYOUT = "2006-01-02 15:04:05.000" INSERT_PREFIX = "insert into " - STATISTIC_TABLE = "statistic" ) var ( @@ -75,6 +75,7 @@ var ( delay int64 // default 10 milliseconds tick int64 save int + saveTable string ) type superTableConfig struct { @@ -278,9 +279,9 @@ func staticSpeed(){ if save == 1 { connection.Exec("use " + db) - _, err := connection.Exec("create table if not exists " + STATISTIC_TABLE +"(ts timestamp, speed int)") + _, err := connection.Exec("create table if not exists " + saveTable +"(ts timestamp, speed int)") if err != nil { - log.Fatalf("create %s Table error: %s\n", STATISTIC_TABLE, err) + log.Fatalf("create %s Table error: %s\n", saveTable, err) } } @@ -297,7 +298,7 @@ func staticSpeed(){ log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed) if save == 1 { - insertSql := fmt.Sprintf("insert into %s values(%d, %d)", STATISTIC_TABLE, currentTime.UnixNano()/1e6, speed) + insertSql := fmt.Sprintf("insert into %s values(%d, %d)", saveTable, currentTime.UnixNano()/1e6, speed) connection.Exec(insertSql) } @@ -353,7 +354,7 @@ func createStatisticTable(){ connection := getConnection() defer connection.Close() - _, err := connection.Exec("create table if not exist " + db + "."+ STATISTIC_TABLE +"(ts timestamp, speed int)") + _, err := connection.Exec("create table if not exist " + db + "."+ saveTable +"(ts timestamp, speed int)") if err != nil { log.Fatalf("createStatisticTable error: %s\n", err) } @@ -1037,6 +1038,7 @@ func parseArg() { flag.Int64Var(&delay, "delay", DEFAULT_DELAY, "the delay time interval(millisecond) to continue generating data when vnum set 0.") flag.Int64Var(&tick, "tick", 2000, "the tick time interval(millisecond) to print statistic info.") flag.IntVar(&save, "save", 0, "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled.") + flag.StringVar(&saveTable, "savetb", DEFAULT_STATISTIC_TABLE, "the table to save 'statistic' info when save set 1.") flag.IntVar(&thread, "thread", 10, "number of threads to import data.") flag.IntVar(&batch, "batch", 100, "rows of records in one import batch.") flag.IntVar(&auto, "auto", 0, "whether to use the starttime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.") @@ -1062,6 +1064,7 @@ func printArg() { fmt.Println("-delay:", delay) fmt.Println("-tick:", tick) fmt.Println("-save:", save) + fmt.Println("-savetb:", saveTable) fmt.Println("-thread:", thread) fmt.Println("-batch:", batch) fmt.Println("-auto:", auto) diff --git a/importSampleData/bin/taosimport b/importSampleData/bin/taosimport index 1cb3c12926ec6657190471ea590e79f4a6b191b6..b042549341bced364e0fd77909b115d1b5b6dc04 100755 Binary files a/importSampleData/bin/taosimport and b/importSampleData/bin/taosimport differ diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 147d9d23cd698d35d903d8f9fa3b378a282b84d5..6df78f6b7bc47adf628569410002e316b59d78d1 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -276,7 +276,8 @@ typedef struct { bool existsCheck; // check if the table exists int8_t showType; // show command type }; - + + int8_t isParseFinish; int8_t isInsertFromFile; // load data from file or not bool import; // import/insert type uint8_t msgType; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 99b9b571d7ffe513e87206c5cd0c5d380318ca95..9c543952b4cb7349224ff1fb088f1099a179f280 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -507,8 +507,21 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; } else { // normal async query continues - code = tsParseSql(pSql, pObj->acctId, pObj->db, false); - if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; + if (pCmd->isParseFinish) { + tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql); + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + assert(code == TSDB_CODE_SUCCESS); + + if (pMeterMetaInfo->pMeterMeta) { + code = tscSendMsgToServer(pSql); + if (code == TSDB_CODE_SUCCESS) return; + } + } else { + code = tsParseSql(pSql, pObj->acctId, pObj->db, false); + if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; + } } } else { // stream computing diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 8c9497c53006286b6cc82e02a2a6181881927c8d..f0824959b09ef41b4f4ede92373c6648868bf789 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -975,7 +975,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { str = pSql->asyncTblPos; } - tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks); + tscTrace("%p create data block list for submit data:%p, asyncTblPos:%p, pTableHashList:%p", pSql, pSql->cmd.pDataBlocks, pSql->asyncTblPos, pSql->pTableHashList); while (1) { int32_t index = 0; @@ -1223,6 +1223,7 @@ _clean: taosCleanUpIntHash(pSql->pTableHashList); pSql->pTableHashList = NULL; pSql->asyncTblPos = NULL; + pCmd->isParseFinish = 1; return code; } diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 2db036824deea494682a3ae0a8bcd7e0bd84d0c5..7e18e2e74b1922a6744349d8d9443c2858344a7f 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -1108,9 +1108,9 @@ void tsSetAllDebugFlag() { * In case that the setLocale failed to be executed, the right charset needs to be set. */ void tsSetLocale() { - char msgLocale[] = "Invalid locale:%s, please set the valid locale in config file"; - char msgCharset[] = "Invalid charset:%s, please set the valid charset in config file"; - char msgCharset1[] = "failed to get charset, please set the valid charset in config file"; + char msgLocale[] = "Invalid locale:%s, please set the valid locale in config file\n"; + char msgCharset[] = "Invalid charset:%s, please set the valid charset in config file\n"; + char msgCharset1[] = "failed to get charset, please set the valid charset in config file\n"; char *locale = setlocale(LC_CTYPE, tsLocale); diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index 1a7f672e00321c0891aa54ae9f3cc3efedb89d54..21818e572f3fc49a2841c2f494362e2a7103f9f0 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -45,6 +45,7 @@ typedef struct { uint32_t uDebugFlag = 131; // all the messages short tsAsyncLog = 1; +static pid_t logPid = 0; static SLogBuff *logHandle = NULL; static int taosLogFileNum = 1; static int taosLogMaxLines = 0; @@ -82,6 +83,11 @@ int taosStartLog() { } int taosInitLog(char *logName, int numOfLogLines, int maxFiles) { + +#ifdef LINUX + logPid = (pid_t)syscall(SYS_gettid); +#endif + logHandle = taosLogBuffNew(TSDB_DEFAULT_LOG_BUF_SIZE); if (logHandle == NULL) return -1; @@ -306,8 +312,8 @@ char *tprefix(char *prefix) { sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId()); #else - sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d 0x%lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, - ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self()); + sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d %d 0x%lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, + ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self()); #endif return prefix; } @@ -333,8 +339,8 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...) len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId()); #else - len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, - ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self()); + len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, + ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self()); #endif len += sprintf(buffer + len, "%s", flags); @@ -424,8 +430,8 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId()); #else - len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, - ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self()); + len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, + ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self()); #endif len += sprintf(buffer + len, "%s", flags); diff --git a/tests/examples/c/stream.c b/tests/examples/c/stream.c index 623775c7801c196f2a78ac99cd3ca5fc39fe86f5..060f5b84ff276579019d3278552e424b2a4198e9 100755 --- a/tests/examples/c/stream.c +++ b/tests/examples/c/stream.c @@ -162,12 +162,13 @@ void* insert_rows(void *sarg) } // insert data - int index = 0; + int64_t begin = (int64_t)time(NULL); + int index = 0; while (1) { if (g_thread_exit_flag) break; index++; - sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, 1546300800000+index*1000, index); + sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, (begin + index) * 1000, index); if (taos_query(taos, command)) { printf("failed to insert row [%s], reason:%s\n", command, taos_errstr(taos)); }