提交 a721ba58 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/hotfix/TS-544' into fix/TD-5903

IF (TD_LINUX) IF (TD_LINUX)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})") INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})")
ELSEIF (TD_WINDOWS) ELSEIF (TD_WINDOWS)
IF (TD_POWER) IF (TD_POWER)
SET(CMAKE_INSTALL_PREFIX C:/PowerDB) SET(CMAKE_INSTALL_PREFIX C:/PowerDB)
...@@ -41,6 +40,5 @@ ELSEIF (TD_WINDOWS) ...@@ -41,6 +40,5 @@ ELSEIF (TD_WINDOWS)
ELSEIF (TD_DARWIN) ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})") INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Darwin ${TD_VER_NUMBER})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Darwin ${TD_VER_NUMBER})")
ENDIF () ENDIF ()
...@@ -5610,7 +5610,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5610,7 +5610,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
bool udf = false; bool udf = false;
if (pQueryInfo->pUdfInfo && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) { if (pQueryInfo->pUdfInfo && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) {
int32_t usize = taosArrayGetSize(pQueryInfo->pUdfInfo); int32_t usize = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo);
for (int32_t i = 0; i < usize; ++i) { for (int32_t i = 0; i < usize; ++i) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, i); SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, i);
...@@ -8406,7 +8406,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8406,7 +8406,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
for (int32_t j = 0; j < usize; ++j) { for (int32_t j = 0; j < usize; ++j) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, j); SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, j);
int32_t len = strlen(pUdfInfo->name); int32_t len = (int32_t)strlen(pUdfInfo->name);
if (len == t->n && strncasecmp(info.name, pUdfInfo->name, t->n) == 0) { if (len == t->n && strncasecmp(info.name, pUdfInfo->name, t->n) == 0) {
exist = 1; exist = 1;
break; break;
......
...@@ -121,7 +121,7 @@ public class Utils { ...@@ -121,7 +121,7 @@ public class Utils {
} }
private static void findValuesClauseRangeSet(String preparedSql, RangeSet<Integer> clauseRangeSet) { private static void findValuesClauseRangeSet(String preparedSql, RangeSet<Integer> clauseRangeSet) {
Matcher matcher = Pattern.compile("(values|,)\\s*(\\([^)]*\\))").matcher(preparedSql); Matcher matcher = Pattern.compile("(values||,)\\s*(\\([^)]*\\))").matcher(preparedSql);
while (matcher.find()) { while (matcher.find()) {
int start = matcher.start(2); int start = matcher.start(2);
int end = matcher.end(2); int end = matcher.end(2);
......
...@@ -73,6 +73,48 @@ public class UtilsTest { ...@@ -73,6 +73,48 @@ public class UtilsTest {
Assert.assertEquals(expected, actual); Assert.assertEquals(expected, actual);
} }
@Test
public void multiValuesAndWhitespace() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?) (?,?,?,?) (?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5, 300, 3.141592, "uvw", 6).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,'abc',4) (200,3.1415,'xyz',5) (300,3.141592,'uvw',6)";
Assert.assertEquals(expected, actual);
}
@Test
public void multiValuesNoSeparator() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?)(?,?,?,?)(?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5, 300, 3.141592, "uvw", 6).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,'abc',4)(200,3.1415,'xyz',5)(300,3.141592,'uvw',6)";
Assert.assertEquals(expected, actual);
}
@Test
public void multiValuesMultiSeparator() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?) (?,?,?,?), (?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5, 300, 3.141592, "uvw", 6).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,'abc',4) (200,3.1415,'xyz',5), (300,3.141592,'uvw',6)";
Assert.assertEquals(expected, actual);
}
@Test @Test
public void lineTerminator() { public void lineTerminator() {
// given // given
...@@ -100,6 +142,32 @@ public class UtilsTest { ...@@ -100,6 +142,32 @@ public class UtilsTest {
Assert.assertEquals(expected, actual); Assert.assertEquals(expected, actual);
} }
@Test
public void lineTerminatorAndMultiValuesAndNoneOrMoreWhitespace() {
String nativeSql = "INSERT Into ? TAGS(?) VALUES(?,?,\r\n?,?),(?,? ,\r\n?,?) t? tags (?) Values (?,?,?\r\n,?) (?,?,?,?) t? Tags(?) values (?,?,?,?) , (?,?,?,?)";
Object[] parameters = Stream.of("t1", "abc", 100, 1.1, "xxx", "xxx", 200, 2.2, "xxx", "xxx", 2, "bcd", 300, 3.3, "xxx", "xxx", 400, 4.4, "xxx", "xxx", 3, "cde", 500, 5.5, "xxx", "xxx", 600, 6.6, "xxx", "xxx").toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT Into t1 TAGS('abc') VALUES(100,1.1,\r\n'xxx','xxx'),(200,2.2 ,\r\n'xxx','xxx') t2 tags ('bcd') Values (300,3.3,'xxx'\r\n,'xxx') (400,4.4,'xxx','xxx') t3 Tags('cde') values (500,5.5,'xxx','xxx') , (600,6.6,'xxx','xxx')";
Assert.assertEquals(expected, actual);
}
@Test
public void multiValuesAndNoneOrMoreWhitespace() {
String nativeSql = "INSERT INTO ? USING traces TAGS (?, ?) VALUES (?, ?, ?, ?, ?, ?, ?) (?, ?, ?, ?, ?, ?, ?)";
Object[] parameters = Stream.of("t1", "t1", "t2", 1632968284000L, 111.111, 119.001, 0.4, 90, 99.1, "WGS84", 1632968285000L, 111.21109999999999, 120.001, 0.5, 91, 99.19999999999999, "WGS84").toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO t1 USING traces TAGS ('t1', 't2') VALUES (1632968284000, 111.111, 119.001, 0.4, 90, 99.1, 'WGS84') (1632968285000, 111.21109999999999, 120.001, 0.5, 91, 99.19999999999999, 'WGS84')";
Assert.assertEquals(expected, actual);
}
@Test @Test
public void replaceNothing() { public void replaceNothing() {
// given // given
......
...@@ -26,6 +26,8 @@ ENDIF () ...@@ -26,6 +26,8 @@ ENDIF ()
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos) SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
ELSEIF (TD_WINDOWS) ELSEIF (TD_WINDOWS)
ADD_DEFINITIONS(-DUNICODE)
ADD_DEFINITIONS(-D_UNICODE)
LIST(APPEND SRC ./src/shellEngine.c) LIST(APPEND SRC ./src/shellEngine.c)
LIST(APPEND SRC ./src/shellMain.c) LIST(APPEND SRC ./src/shellMain.c)
LIST(APPEND SRC ./src/shellWindows.c) LIST(APPEND SRC ./src/shellWindows.c)
......
...@@ -95,6 +95,9 @@ SShellArguments args = { ...@@ -95,6 +95,9 @@ SShellArguments args = {
*/ */
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
/*setlocale(LC_ALL, "en_US.UTF-8"); */ /*setlocale(LC_ALL, "en_US.UTF-8"); */
#ifdef WINDOWS
SetConsoleOutputCP(CP_UTF8);
#endif
if (!checkVersion()) { if (!checkVersion()) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
......
...@@ -256,13 +256,16 @@ int32_t shellReadCommand(TAOS *con, char command[]) { ...@@ -256,13 +256,16 @@ int32_t shellReadCommand(TAOS *con, char command[]) {
cmd.command = (char *)calloc(1, MAX_COMMAND_SIZE); cmd.command = (char *)calloc(1, MAX_COMMAND_SIZE);
// Read input. // Read input.
char c; void *console = GetStdHandle(STD_INPUT_HANDLE);
unsigned long read;
wchar_t c;
char mbStr[16];
while (1) { while (1) {
c = getchar(); int ret = ReadConsole(console, &c, 1, &read, NULL);
int size = WideCharToMultiByte(CP_UTF8, 0, &c, read, mbStr, sizeof(mbStr), NULL, NULL);
mbStr[size] = 0;
switch (c) { switch (c) {
case '\n': case '\n':
case '\r':
if (isReadyGo(&cmd)) { if (isReadyGo(&cmd)) {
sprintf(command, "%s%s", cmd.buffer, cmd.command); sprintf(command, "%s%s", cmd.buffer, cmd.command);
free(cmd.buffer); free(cmd.buffer);
...@@ -275,8 +278,12 @@ int32_t shellReadCommand(TAOS *con, char command[]) { ...@@ -275,8 +278,12 @@ int32_t shellReadCommand(TAOS *con, char command[]) {
updateBuffer(&cmd); updateBuffer(&cmd);
} }
break; break;
case '\r':
break;
default: default:
insertChar(&cmd, c); for (int i = 0; i < size; ++i) {
insertChar(&cmd, mbStr[i]);
}
} }
} }
......
...@@ -75,6 +75,7 @@ extern char configDir[]; ...@@ -75,6 +75,7 @@ extern char configDir[];
#define MAX_DATA_SIZE (16*TSDB_MAX_COLUMNS)+20 // max record len: 16*MAX_COLUMNS, timestamp string and ,('') need extra space #define MAX_DATA_SIZE (16*TSDB_MAX_COLUMNS)+20 // max record len: 16*MAX_COLUMNS, timestamp string and ,('') need extra space
#define OPT_ABORT 1 /* –abort */ #define OPT_ABORT 1 /* –abort */
#define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255. #define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255.
#define MAX_PATH_LEN 4096
#define DEFAULT_START_TIME 1500000000000 #define DEFAULT_START_TIME 1500000000000
...@@ -244,6 +245,7 @@ typedef struct SArguments_S { ...@@ -244,6 +245,7 @@ typedef struct SArguments_S {
uint64_t insert_interval; uint64_t insert_interval;
uint64_t timestamp_step; uint64_t timestamp_step;
int64_t query_times; int64_t query_times;
int64_t prepared_rand;
uint32_t interlaceRows; uint32_t interlaceRows;
uint32_t reqPerReq; // num_of_records_per_req uint32_t reqPerReq; // num_of_records_per_req
uint64_t max_sql_len; uint64_t max_sql_len;
...@@ -303,6 +305,7 @@ typedef struct SSuperTable_S { ...@@ -303,6 +305,7 @@ typedef struct SSuperTable_S {
uint64_t lenOfTagOfOneRow; uint64_t lenOfTagOfOneRow;
char* sampleDataBuf; char* sampleDataBuf;
bool useSampleTs;
uint32_t tagSource; // 0: rand, 1: tag sample uint32_t tagSource; // 0: rand, 1: tag sample
char* tagDataBuf; char* tagDataBuf;
...@@ -363,7 +366,7 @@ typedef struct SDataBase_S { ...@@ -363,7 +366,7 @@ typedef struct SDataBase_S {
bool drop; // 0: use exists, 1: if exists, drop then new create bool drop; // 0: use exists, 1: if exists, drop then new create
SDbCfg dbCfg; SDbCfg dbCfg;
uint64_t superTblCount; uint64_t superTblCount;
SSuperTable superTbls[MAX_SUPER_TABLE_COUNT]; SSuperTable* superTbls;
} SDataBase; } SDataBase;
typedef struct SDbs_S { typedef struct SDbs_S {
...@@ -382,12 +385,11 @@ typedef struct SDbs_S { ...@@ -382,12 +385,11 @@ typedef struct SDbs_S {
uint32_t threadCount; uint32_t threadCount;
uint32_t threadCountForCreateTbl; uint32_t threadCountForCreateTbl;
uint32_t dbCount; uint32_t dbCount;
SDataBase db[MAX_DB_COUNT];
// statistics // statistics
uint64_t totalInsertRows; uint64_t totalInsertRows;
uint64_t totalAffectedRows; uint64_t totalAffectedRows;
SDataBase* db;
} SDbs; } SDbs;
typedef struct SpecifiedQueryInfo_S { typedef struct SpecifiedQueryInfo_S {
...@@ -501,6 +503,7 @@ typedef struct SThreadInfo_S { ...@@ -501,6 +503,7 @@ typedef struct SThreadInfo_S {
uint64_t querySeq; // sequence number of sql command uint64_t querySeq; // sequence number of sql command
TAOS_SUB* tsub; TAOS_SUB* tsub;
int sockfd;
} threadInfo; } threadInfo;
#ifdef WINDOWS #ifdef WINDOWS
...@@ -580,8 +583,7 @@ static void prompt(); ...@@ -580,8 +583,7 @@ static void prompt();
static int createDatabasesAndStables(); static int createDatabasesAndStables();
static void createChildTables(); static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static int postProceSql(char *host, struct sockaddr_in *pServAddr, static int postProceSql(char *host, uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
int disorderRatio, int disorderRange); int disorderRatio, int disorderRange);
static bool getInfoFromJsonFile(char* file); static bool getInfoFromJsonFile(char* file);
...@@ -590,12 +592,12 @@ static int regexMatch(const char *s, const char *reg, int cflags); ...@@ -590,12 +592,12 @@ static int regexMatch(const char *s, const char *reg, int cflags);
/* ************ Global variables ************ */ /* ************ Global variables ************ */
int32_t g_randint[MAX_PREPARED_RAND]; int32_t* g_randint;
uint32_t g_randuint[MAX_PREPARED_RAND]; uint32_t* g_randuint;
int64_t g_randbigint[MAX_PREPARED_RAND]; int64_t* g_randbigint;
uint64_t g_randubigint[MAX_PREPARED_RAND]; uint64_t* g_randubigint;
float g_randfloat[MAX_PREPARED_RAND]; float* g_randfloat;
double g_randdouble[MAX_PREPARED_RAND]; double* g_randdouble;
char *g_randbool_buff = NULL; char *g_randbool_buff = NULL;
char *g_randint_buff = NULL; char *g_randint_buff = NULL;
...@@ -662,6 +664,7 @@ SArguments g_args = { ...@@ -662,6 +664,7 @@ SArguments g_args = {
0, // insert_interval 0, // insert_interval
DEFAULT_TIMESTAMP_STEP, // timestamp_step DEFAULT_TIMESTAMP_STEP, // timestamp_step
1, // query_times 1, // query_times
10000, // prepared_rand
DEFAULT_INTERLACE_ROWS, // interlaceRows; DEFAULT_INTERLACE_ROWS, // interlaceRows;
30000, // reqPerReq 30000, // reqPerReq
(1024*1024), // max_sql_len (1024*1024), // max_sql_len
...@@ -796,6 +799,8 @@ static void printHelp() { ...@@ -796,6 +799,8 @@ static void printHelp() {
"Set the replica parameters of the database, By default use 1, min: 1, max: 3."); "Set the replica parameters of the database, By default use 1, min: 1, max: 3.");
printf("%s%s%s%s\n", indent, "-m, --table-prefix=TABLEPREFIX", "\t", printf("%s%s%s%s\n", indent, "-m, --table-prefix=TABLEPREFIX", "\t",
"Table prefix name. By default use 'd'."); "Table prefix name. By default use 'd'.");
printf("%s%s%s%s\n", indent, "-E, --escape-character", "\t",
"Use escape character for Both Stable and normmal table name");
printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t", printf("%s%s%s%s\n", indent, "-s, --sql-file=FILE", "\t\t",
"The select sql file."); "The select sql file.");
printf("%s%s%s%s\n", indent, "-N, --normal-table", "\t\t", "Use normal table flag."); printf("%s%s%s%s\n", indent, "-N, --normal-table", "\t\t", "Use normal table flag.");
...@@ -1687,10 +1692,10 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -1687,10 +1692,10 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->data_type[index] = TSDB_DATA_TYPE_DOUBLE; arguments->data_type[index] = TSDB_DATA_TYPE_DOUBLE;
} else if (0 == strcasecmp(token, "TINYINT")) { } else if (0 == strcasecmp(token, "TINYINT")) {
arguments->data_type[index] = TSDB_DATA_TYPE_TINYINT; arguments->data_type[index] = TSDB_DATA_TYPE_TINYINT;
} else if (1 == regexMatch(token, "^BINARY(\\([1-9][0-9]*\\))?$", REG_ICASE | } else if (1 == regexMatch(token, "^BINARY(\\([1-9][0-9]*\\))?$", REG_ICASE |
REG_EXTENDED)) { REG_EXTENDED)) {
arguments->data_type[index] = TSDB_DATA_TYPE_BINARY; arguments->data_type[index] = TSDB_DATA_TYPE_BINARY;
} else if (1 == regexMatch(token, "^NCHAR(\\([1-9][0-9]*\\))?$", REG_ICASE | } else if (1 == regexMatch(token, "^NCHAR(\\([1-9][0-9]*\\))?$", REG_ICASE |
REG_EXTENDED)) { REG_EXTENDED)) {
arguments->data_type[index] = TSDB_DATA_TYPE_NCHAR; arguments->data_type[index] = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strcasecmp(token, "BOOL")) { } else if (0 == strcasecmp(token, "BOOL")) {
...@@ -2097,7 +2102,7 @@ static void tmfclose(FILE *fp) { ...@@ -2097,7 +2102,7 @@ static void tmfclose(FILE *fp) {
} }
} }
static void tmfree(char *buf) { static void tmfree(void *buf) {
if (NULL != buf) { if (NULL != buf) {
free(buf); free(buf);
buf = NULL; buf = NULL;
...@@ -2205,7 +2210,7 @@ static void selectAndGetResult( ...@@ -2205,7 +2210,7 @@ static void selectAndGetResult(
} else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) { } else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
int retCode = postProceSql( int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port, g_queryInfo.host, g_queryInfo.port,
command, command,
pThreadInfo); pThreadInfo);
if (0 != retCode) { if (0 != retCode) {
...@@ -2221,157 +2226,157 @@ static void selectAndGetResult( ...@@ -2221,157 +2226,157 @@ static void selectAndGetResult(
static char *rand_bool_str() { static char *rand_bool_str() {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randbool_buff + ((cursor % MAX_PREPARED_RAND) * BOOL_BUFF_LEN); return g_randbool_buff + ((cursor % g_args.prepared_rand) * BOOL_BUFF_LEN);
} }
static int32_t rand_bool() { static int32_t rand_bool() {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint[cursor % MAX_PREPARED_RAND] % 2; return g_randint[cursor % g_args.prepared_rand] % 2;
} }
static char *rand_tinyint_str() static char *rand_tinyint_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randtinyint_buff + return g_randtinyint_buff +
((cursor % MAX_PREPARED_RAND) * TINYINT_BUFF_LEN); ((cursor % g_args.prepared_rand) * TINYINT_BUFF_LEN);
} }
static int32_t rand_tinyint() static int32_t rand_tinyint()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint[cursor % MAX_PREPARED_RAND] % 128; return g_randint[cursor % g_args.prepared_rand] % 128;
} }
static char *rand_utinyint_str() static char *rand_utinyint_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randutinyint_buff + return g_randutinyint_buff +
((cursor % MAX_PREPARED_RAND) * TINYINT_BUFF_LEN); ((cursor % g_args.prepared_rand) * TINYINT_BUFF_LEN);
} }
static int32_t rand_utinyint() static int32_t rand_utinyint()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randuint[cursor % MAX_PREPARED_RAND] % 255; return g_randuint[cursor % g_args.prepared_rand] % 255;
} }
static char *rand_smallint_str() static char *rand_smallint_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randsmallint_buff + return g_randsmallint_buff +
((cursor % MAX_PREPARED_RAND) * SMALLINT_BUFF_LEN); ((cursor % g_args.prepared_rand) * SMALLINT_BUFF_LEN);
} }
static int32_t rand_smallint() static int32_t rand_smallint()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint[cursor % MAX_PREPARED_RAND] % 32768; return g_randint[cursor % g_args.prepared_rand] % 32768;
} }
static char *rand_usmallint_str() static char *rand_usmallint_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randusmallint_buff + return g_randusmallint_buff +
((cursor % MAX_PREPARED_RAND) * SMALLINT_BUFF_LEN); ((cursor % g_args.prepared_rand) * SMALLINT_BUFF_LEN);
} }
static int32_t rand_usmallint() static int32_t rand_usmallint()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randuint[cursor % MAX_PREPARED_RAND] % 65535; return g_randuint[cursor % g_args.prepared_rand] % 65535;
} }
static char *rand_int_str() static char *rand_int_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint_buff + ((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN); return g_randint_buff + ((cursor % g_args.prepared_rand) * INT_BUFF_LEN);
} }
static int32_t rand_int() static int32_t rand_int()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randint[cursor % MAX_PREPARED_RAND]; return g_randint[cursor % g_args.prepared_rand];
} }
static char *rand_uint_str() static char *rand_uint_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randuint_buff + ((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN); return g_randuint_buff + ((cursor % g_args.prepared_rand) * INT_BUFF_LEN);
} }
static int32_t rand_uint() static int32_t rand_uint()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randuint[cursor % MAX_PREPARED_RAND]; return g_randuint[cursor % g_args.prepared_rand];
} }
static char *rand_bigint_str() static char *rand_bigint_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randbigint_buff + return g_randbigint_buff +
((cursor % MAX_PREPARED_RAND) * BIGINT_BUFF_LEN); ((cursor % g_args.prepared_rand) * BIGINT_BUFF_LEN);
} }
static int64_t rand_bigint() static int64_t rand_bigint()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randbigint[cursor % MAX_PREPARED_RAND]; return g_randbigint[cursor % g_args.prepared_rand];
} }
static char *rand_ubigint_str() static char *rand_ubigint_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randubigint_buff + return g_randubigint_buff +
((cursor % MAX_PREPARED_RAND) * BIGINT_BUFF_LEN); ((cursor % g_args.prepared_rand) * BIGINT_BUFF_LEN);
} }
static int64_t rand_ubigint() static int64_t rand_ubigint()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randubigint[cursor % MAX_PREPARED_RAND]; return g_randubigint[cursor % g_args.prepared_rand];
} }
static char *rand_float_str() static char *rand_float_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randfloat_buff + ((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN); return g_randfloat_buff + ((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN);
} }
...@@ -2379,58 +2384,58 @@ static float rand_float() ...@@ -2379,58 +2384,58 @@ static float rand_float()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randfloat[cursor % MAX_PREPARED_RAND]; return g_randfloat[cursor % g_args.prepared_rand];
} }
static char *demo_current_float_str() static char *demo_current_float_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_rand_current_buff + return g_rand_current_buff +
((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN); ((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN);
} }
static float UNUSED_FUNC demo_current_float() static float UNUSED_FUNC demo_current_float()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return (float)(9.8 + 0.04 * (g_randint[cursor % MAX_PREPARED_RAND] % 10) return (float)(9.8 + 0.04 * (g_randint[cursor % g_args.prepared_rand] % 10)
+ g_randfloat[cursor % MAX_PREPARED_RAND]/1000000000); + g_randfloat[cursor % g_args.prepared_rand]/1000000000);
} }
static char *demo_voltage_int_str() static char *demo_voltage_int_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_rand_voltage_buff + return g_rand_voltage_buff +
((cursor % MAX_PREPARED_RAND) * INT_BUFF_LEN); ((cursor % g_args.prepared_rand) * INT_BUFF_LEN);
} }
static int32_t UNUSED_FUNC demo_voltage_int() static int32_t UNUSED_FUNC demo_voltage_int()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return 215 + g_randint[cursor % MAX_PREPARED_RAND] % 10; return 215 + g_randint[cursor % g_args.prepared_rand] % 10;
} }
static char *demo_phase_float_str() { static char *demo_phase_float_str() {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_rand_phase_buff + ((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN); return g_rand_phase_buff + ((cursor % g_args.prepared_rand) * FLOAT_BUFF_LEN);
} }
static float UNUSED_FUNC demo_phase_float() { static float UNUSED_FUNC demo_phase_float() {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return (float)((115 + g_randint[cursor % MAX_PREPARED_RAND] % 10 return (float)((115 + g_randint[cursor % g_args.prepared_rand] % 10
+ g_randfloat[cursor % MAX_PREPARED_RAND]/1000000000)/360); + g_randfloat[cursor % g_args.prepared_rand]/1000000000)/360);
} }
#if 0 #if 0
...@@ -2469,7 +2474,7 @@ static char *rand_double_str() ...@@ -2469,7 +2474,7 @@ static char *rand_double_str()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0; if (cursor > (g_args.prepared_rand - 1)) cursor = 0;
return g_randdouble_buff + (cursor * DOUBLE_BUFF_LEN); return g_randdouble_buff + (cursor * DOUBLE_BUFF_LEN);
} }
...@@ -2477,42 +2482,54 @@ static double rand_double() ...@@ -2477,42 +2482,54 @@ static double rand_double()
{ {
static int cursor; static int cursor;
cursor++; cursor++;
cursor = cursor % MAX_PREPARED_RAND; cursor = cursor % g_args.prepared_rand;
return g_randdouble[cursor]; return g_randdouble[cursor];
} }
static void init_rand_data() { static void init_rand_data() {
g_randint_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND); g_randint_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand);
assert(g_randint_buff); assert(g_randint_buff);
g_rand_voltage_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND); g_rand_voltage_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand);
assert(g_rand_voltage_buff); assert(g_rand_voltage_buff);
g_randbigint_buff = calloc(1, BIGINT_BUFF_LEN * MAX_PREPARED_RAND); g_randbigint_buff = calloc(1, BIGINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randbigint_buff); assert(g_randbigint_buff);
g_randsmallint_buff = calloc(1, SMALLINT_BUFF_LEN * MAX_PREPARED_RAND); g_randsmallint_buff = calloc(1, SMALLINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randsmallint_buff); assert(g_randsmallint_buff);
g_randtinyint_buff = calloc(1, TINYINT_BUFF_LEN * MAX_PREPARED_RAND); g_randtinyint_buff = calloc(1, TINYINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randtinyint_buff); assert(g_randtinyint_buff);
g_randbool_buff = calloc(1, BOOL_BUFF_LEN * MAX_PREPARED_RAND); g_randbool_buff = calloc(1, BOOL_BUFF_LEN * g_args.prepared_rand);
assert(g_randbool_buff); assert(g_randbool_buff);
g_randfloat_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND); g_randfloat_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand);
assert(g_randfloat_buff); assert(g_randfloat_buff);
g_rand_current_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND); g_rand_current_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand);
assert(g_rand_current_buff); assert(g_rand_current_buff);
g_rand_phase_buff = calloc(1, FLOAT_BUFF_LEN * MAX_PREPARED_RAND); g_rand_phase_buff = calloc(1, FLOAT_BUFF_LEN * g_args.prepared_rand);
assert(g_rand_phase_buff); assert(g_rand_phase_buff);
g_randdouble_buff = calloc(1, DOUBLE_BUFF_LEN * MAX_PREPARED_RAND); g_randdouble_buff = calloc(1, DOUBLE_BUFF_LEN * g_args.prepared_rand);
assert(g_randdouble_buff); assert(g_randdouble_buff);
g_randuint_buff = calloc(1, INT_BUFF_LEN * MAX_PREPARED_RAND); g_randuint_buff = calloc(1, INT_BUFF_LEN * g_args.prepared_rand);
assert(g_randuint_buff); assert(g_randuint_buff);
g_randutinyint_buff = calloc(1, TINYINT_BUFF_LEN * MAX_PREPARED_RAND); g_randutinyint_buff = calloc(1, TINYINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randutinyint_buff); assert(g_randutinyint_buff);
g_randusmallint_buff = calloc(1, SMALLINT_BUFF_LEN * MAX_PREPARED_RAND); g_randusmallint_buff = calloc(1, SMALLINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randusmallint_buff); assert(g_randusmallint_buff);
g_randubigint_buff = calloc(1, BIGINT_BUFF_LEN * MAX_PREPARED_RAND); g_randubigint_buff = calloc(1, BIGINT_BUFF_LEN * g_args.prepared_rand);
assert(g_randubigint_buff); assert(g_randubigint_buff);
g_randint = calloc(1, sizeof(int32_t) * g_args.prepared_rand);
for (int i = 0; i < MAX_PREPARED_RAND; i++) { assert(g_randint);
g_randuint = calloc(1, sizeof(uint32_t) * g_args.prepared_rand);
assert(g_randuint);
g_randbigint = calloc(1, sizeof(int64_t) * g_args.prepared_rand);
assert(g_randbigint);
g_randubigint = calloc(1, sizeof(uint64_t) * g_args.prepared_rand);
assert(g_randubigint);
g_randfloat = calloc(1, sizeof(float) * g_args.prepared_rand);
assert(g_randfloat);
g_randdouble = calloc(1, sizeof(double) * g_args.prepared_rand);
assert(g_randdouble);
for (int i = 0; i < g_args.prepared_rand; i++) {
g_randint[i] = (int)(taosRandom() % RAND_MAX - (RAND_MAX >> 1)); g_randint[i] = (int)(taosRandom() % RAND_MAX - (RAND_MAX >> 1));
g_randuint[i] = (int)(taosRandom()); g_randuint[i] = (int)(taosRandom());
sprintf(g_randint_buff + i * INT_BUFF_LEN, "%d", sprintf(g_randint_buff + i * INT_BUFF_LEN, "%d",
...@@ -2755,6 +2772,8 @@ static int printfInsertMeta() { ...@@ -2755,6 +2772,8 @@ static int printfInsertMeta() {
g_Dbs.db[i].superTbls[j].sampleFormat); g_Dbs.db[i].superTbls[j].sampleFormat);
printf(" sampleFile: \033[33m%s\033[0m\n", printf(" sampleFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sampleFile); g_Dbs.db[i].superTbls[j].sampleFile);
printf(" useSampleTs: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].useSampleTs ? "yes (warning: disorderRange/disorderRatio is disabled)" : "no");
printf(" tagsFile: \033[33m%s\033[0m\n", printf(" tagsFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].tagsFile); g_Dbs.db[i].superTbls[j].tagsFile);
printf(" columnCount: \033[33m%d\033[0m\n ", printf(" columnCount: \033[33m%d\033[0m\n ",
...@@ -2799,8 +2818,6 @@ static int printfInsertMeta() { ...@@ -2799,8 +2818,6 @@ static int printfInsertMeta() {
printf(" insertRows: \033[33m%"PRId64"\033[0m\n", printf(" insertRows: \033[33m%"PRId64"\033[0m\n",
g_args.insertRows); g_args.insertRows);
} }
printf("\n"); printf("\n");
} }
...@@ -3384,7 +3401,7 @@ static void printfQuerySystemInfo(TAOS * taos) { ...@@ -3384,7 +3401,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
free(dbInfos); free(dbInfos);
} }
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, static int postProceSql(char *host, uint16_t port,
char* sqlstr, threadInfo *pThreadInfo) char* sqlstr, threadInfo *pThreadInfo)
{ {
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
...@@ -3416,35 +3433,18 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -3416,35 +3433,18 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
'w', 'x', 'y', 'z', '0', '1', '2', '3', 'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '+', '/'}; '4', '5', '6', '7', '8', '9', '+', '/'};
snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s", if (g_args.test_mode == INSERT_TEST) {
snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s",
g_Dbs.user, g_Dbs.password); g_Dbs.user, g_Dbs.password);
} else {
snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s",
g_queryInfo.user, g_queryInfo.password);
}
size_t userpass_buf_len = strlen(userpass_buf); size_t userpass_buf_len = strlen(userpass_buf);
size_t encoded_len = 4 * ((userpass_buf_len +2) / 3); size_t encoded_len = 4 * ((userpass_buf_len +2) / 3);
char base64_buf[INPUT_BUF_LEN]; char base64_buf[INPUT_BUF_LEN];
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
free(request_buf);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
free(request_buf);
ERROR_EXIT("connecting");
}
memset(base64_buf, 0, INPUT_BUF_LEN); memset(base64_buf, 0, INPUT_BUF_LEN);
...@@ -3484,9 +3484,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -3484,9 +3484,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
sent = 0; sent = 0;
do { do {
#ifdef WINDOWS #ifdef WINDOWS
bytes = send(sockfd, request_buf + sent, req_str_len - sent, 0); bytes = send(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent, 0);
#else #else
bytes = write(sockfd, request_buf + sent, req_str_len - sent); bytes = write(pThreadInfo->sockfd, request_buf + sent, req_str_len - sent);
#endif #endif
if (bytes < 0) if (bytes < 0)
ERROR_EXIT("writing message to socket"); ERROR_EXIT("writing message to socket");
...@@ -3501,15 +3501,13 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -3501,15 +3501,13 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
char resEncodingChunk[] = "Encoding: chunked"; char resEncodingChunk[] = "Encoding: chunked";
char resHttp[] = "HTTP/1.1 "; char resHttp[] = "HTTP/1.1 ";
int resHttpLen = strlen(resHttp);
char resHttpOk[] = "HTTP/1.1 200 OK"; char resHttpOk[] = "HTTP/1.1 200 OK";
int resHttpOkLen = strlen(resHttpOk);
do { do {
#ifdef WINDOWS #ifdef WINDOWS
bytes = recv(sockfd, response_buf + received, resp_len - received, 0); bytes = recv(pThreadInfo->sockfd, response_buf + received, resp_len - received, 0);
#else #else
bytes = read(sockfd, response_buf + received, resp_len - received); bytes = read(pThreadInfo->sockfd, response_buf + received, resp_len - received);
#endif #endif
verbosePrint("%s() LN%d: bytes:%d\n", __func__, __LINE__, bytes); verbosePrint("%s() LN%d: bytes:%d\n", __func__, __LINE__, bytes);
if (bytes < 0) { if (bytes < 0) {
...@@ -3520,15 +3518,14 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -3520,15 +3518,14 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
break; break;
received += bytes; received += bytes;
response_buf[RESP_BUF_LEN - 1] = '\0'; verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf);
response_buf[RESP_BUF_LEN - 1] = '\0';
if (strlen(response_buf)) { if (strlen(response_buf)) {
verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf);
if (((NULL == strstr(response_buf, resEncodingChunk)) if (((NULL == strstr(response_buf, resEncodingChunk))
&& (0 == strncmp(response_buf, resHttp, resHttpLen))) && (NULL != strstr(response_buf, resHttp)))
|| ((0 == strncmp(response_buf, resHttpOk, resHttpOkLen)) || ((NULL != strstr(response_buf, resHttpOk))
&& (NULL != strstr(response_buf, "\"status\":")))) { && (NULL != strstr(response_buf, "\"status\":")))) {
debugPrint( debugPrint(
"%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n", "%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
...@@ -3548,14 +3545,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port ...@@ -3548,14 +3545,9 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
} }
free(request_buf); free(request_buf);
#ifdef WINDOWS
closesocket(sockfd);
WSACleanup();
#else
close(sockfd);
#endif
if (NULL == strstr(response_buf, "\"status\":\"succ\"")) { response_buf[RESP_BUF_LEN - 1] = '\0';
if (NULL == strstr(response_buf, resHttpOk)) {
errorPrint("%s() LN%d, Response:\n%s\n", errorPrint("%s() LN%d, Response:\n%s\n",
__func__, __LINE__, response_buf); __func__, __LINE__, response_buf);
return -1; return -1;
...@@ -3817,8 +3809,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, ...@@ -3817,8 +3809,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
limit, offset); limit, offset);
//get all child table name use cmd: select tbname from superTblName; //get all child table name use cmd: select tbname from superTblName;
snprintf(command, 1024, "select tbname from %s.%s %s", snprintf(command, 1024, "select tbname from %s.%s %s", dbName, stbName, limitBuf);
dbName, stbName, limitBuf);
res = taos_query(taos, command); res = taos_query(taos, command);
int32_t code = taos_errno(res); int32_t code = taos_errno(res);
...@@ -3999,21 +3990,21 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, ...@@ -3999,21 +3990,21 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
"INT", strlen("INT")) && "INT", strlen("INT")) &&
strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_INT; superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_INT;
} else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], } else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
"TINYINT", strlen("TINYINT")) && "TINYINT", strlen("TINYINT")) &&
strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_TINYINT; superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_TINYINT;
} else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], } else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
"SMALLINT", strlen("SMALLINT")) && "SMALLINT", strlen("SMALLINT")) &&
strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_SMALLINT; superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_SMALLINT;
} else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], } else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
"BIGINT", strlen("BIGINT")) && "BIGINT", strlen("BIGINT")) &&
strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_BIGINT; superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_BIGINT;
} else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], } else if (0 == strncasecmp((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
...@@ -4067,7 +4058,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, ...@@ -4067,7 +4058,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
min(NOTE_BUFF_LEN, min(NOTE_BUFF_LEN,
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes) + 1); fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes) + 1);
if (strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) { if (strstr((char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], "UNSIGNED") == NULL) {
tstrncpy(superTbls->columns[columnIndex].dataType, tstrncpy(superTbls->columns[columnIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
...@@ -4346,9 +4337,10 @@ static int createSuperTable( ...@@ -4346,9 +4337,10 @@ static int createSuperTable(
superTbl->lenOfTagOfOneRow = lenOfTagOfOneRow; superTbl->lenOfTagOfOneRow = lenOfTagOfOneRow;
snprintf(command, BUFFER_SIZE, snprintf(command, BUFFER_SIZE,
"CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP%s) TAGS %s", "CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP%s) TAGS %s",
dbName, superTbl->stbName, cols, tags); dbName, superTbl->stbName, cols, tags);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
errorPrint2("create supertable %s failed!\n\n", errorPrint2("create supertable %s failed!\n\n",
superTbl->stbName); superTbl->stbName);
...@@ -4594,7 +4586,6 @@ static void* createTable(void *sarg) ...@@ -4594,7 +4586,6 @@ static void* createTable(void *sarg)
return NULL; return NULL;
} }
pThreadInfo->tables_created += batchNum; pThreadInfo->tables_created += batchNum;
uint64_t currentPrintTime = taosGetTimestampMs(); uint64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n", printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
...@@ -4608,8 +4599,8 @@ static void* createTable(void *sarg) ...@@ -4608,8 +4599,8 @@ static void* createTable(void *sarg)
NO_INSERT_TYPE, false)) { NO_INSERT_TYPE, false)) {
errorPrint2("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer); errorPrint2("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer);
} }
pThreadInfo->tables_created += batchNum;
} }
free(pThreadInfo->buffer); free(pThreadInfo->buffer);
return NULL; return NULL;
} }
...@@ -4816,6 +4807,23 @@ static int readTagFromCsvFileToMem(SSuperTable * stbInfo) { ...@@ -4816,6 +4807,23 @@ static int readTagFromCsvFileToMem(SSuperTable * stbInfo) {
return 0; return 0;
} }
static void getAndSetRowsFromCsvFile(SSuperTable *stbInfo) {
FILE *fp = fopen(stbInfo->sampleFile, "r");
int line_count = 0;
if (fp == NULL) {
errorPrint("Failed to open sample file: %s, reason:%s\n",
stbInfo->sampleFile, strerror(errno));
exit(EXIT_FAILURE);
}
char *buf = calloc(1, stbInfo->maxSqlLen);
while (fgets(buf, stbInfo->maxSqlLen, fp)) {
line_count++;
}
fclose(fp);
tmfree(buf);
stbInfo->insertRows = line_count;
}
/* /*
Read 10000 lines at most. If more than 10000 lines, continue to read after using Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/ */
...@@ -5269,6 +5277,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -5269,6 +5277,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* prepareRand = cJSON_GetObjectItem(root, "prepared_rand");
if (prepareRand && prepareRand->type == cJSON_Number) {
if (prepareRand->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, prepared_rand input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.prepared_rand = prepareRand->valueint;
} else if (!prepareRand) {
g_args.prepared_rand = 10000;
} else {
errorPrint("%s() LN%d, failed to read json, prepared_rand not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt if (answerPrompt
&& answerPrompt->type == cJSON_String && answerPrompt->type == cJSON_String
...@@ -5310,7 +5334,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -5310,7 +5334,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
MAX_DB_COUNT); MAX_DB_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db = calloc(1, sizeof(SDataBase)*dbSize);
assert(g_Dbs.db);
g_Dbs.dbCount = dbSize; g_Dbs.dbCount = dbSize;
for (int i = 0; i < dbSize; ++i) { for (int i = 0; i < dbSize; ++i) {
cJSON* dbinfos = cJSON_GetArrayItem(dbs, i); cJSON* dbinfos = cJSON_GetArrayItem(dbs, i);
...@@ -5510,7 +5535,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -5510,7 +5535,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
MAX_SUPER_TABLE_COUNT); MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls = calloc(1, stbSize * sizeof(SSuperTable));
assert(g_Dbs.db[i].superTbls);
g_Dbs.db[i].superTblCount = stbSize; g_Dbs.db[i].superTblCount = stbSize;
for (int j = 0; j < stbSize; ++j) { for (int j = 0; j < stbSize; ++j) {
cJSON* stbInfo = cJSON_GetArrayItem(stables, j); cJSON* stbInfo = cJSON_GetArrayItem(stables, j);
...@@ -5709,6 +5735,23 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -5709,6 +5735,23 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON *useSampleTs = cJSON_GetObjectItem(stbInfo, "use_sample_ts");
if (useSampleTs && useSampleTs->type == cJSON_String
&& useSampleTs->valuestring != NULL) {
if (0 == strncasecmp(useSampleTs->valuestring, "yes", 3)) {
g_Dbs.db[i].superTbls[j].useSampleTs = true;
} else if (0 == strncasecmp(useSampleTs->valuestring, "no", 2)){
g_Dbs.db[i].superTbls[j].useSampleTs = false;
} else {
g_Dbs.db[i].superTbls[j].useSampleTs = false;
}
} else if (!useSampleTs) {
g_Dbs.db[i].superTbls[j].useSampleTs = false;
} else {
errorPrint("%s", "failed to read json, use_sample_ts not found\n");
goto PARSE_OVER;
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file"); cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
if ((tagsFile && tagsFile->type == cJSON_String) if ((tagsFile && tagsFile->type == cJSON_String)
&& (tagsFile->valuestring != NULL)) { && (tagsFile->valuestring != NULL)) {
...@@ -6366,9 +6409,12 @@ static bool getInfoFromJsonFile(char* file) { ...@@ -6366,9 +6409,12 @@ static bool getInfoFromJsonFile(char* file) {
} }
if (INSERT_TEST == g_args.test_mode) { if (INSERT_TEST == g_args.test_mode) {
memset(&g_Dbs, 0, sizeof(SDbs));
g_Dbs.use_metric = g_args.use_metric;
ret = getMetaFromInsertJsonFile(root); ret = getMetaFromInsertJsonFile(root);
} else if ((QUERY_TEST == g_args.test_mode) } else if ((QUERY_TEST == g_args.test_mode)
|| (SUBSCRIBE_TEST == g_args.test_mode)) { || (SUBSCRIBE_TEST == g_args.test_mode)) {
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
ret = getMetaFromQueryJsonFile(root); ret = getMetaFromQueryJsonFile(root);
} else { } else {
errorPrint("%s", errorPrint("%s",
...@@ -6433,8 +6479,9 @@ static void postFreeResource() { ...@@ -6433,8 +6479,9 @@ static void postFreeResource() {
g_Dbs.db[i].superTbls[j].childTblName = NULL; g_Dbs.db[i].superTbls[j].childTblName = NULL;
} }
} }
tmfree(g_Dbs.db[i].superTbls);
} }
tmfree(g_Dbs.db);
tmfree(g_randbool_buff); tmfree(g_randbool_buff);
tmfree(g_randint_buff); tmfree(g_randint_buff);
tmfree(g_rand_voltage_buff); tmfree(g_rand_voltage_buff);
...@@ -6457,6 +6504,7 @@ static void postFreeResource() { ...@@ -6457,6 +6504,7 @@ static void postFreeResource() {
} }
} }
tmfree(g_sampleBindBatchArray); tmfree(g_sampleBindBatchArray);
#endif #endif
} }
...@@ -6469,13 +6517,20 @@ static int getRowDataFromSample( ...@@ -6469,13 +6517,20 @@ static int getRowDataFromSample(
} }
int dataLen = 0; int dataLen = 0;
if(stbInfo->useSampleTs) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"(%s",
stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * (*sampleUsePos));
} else {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"(%" PRId64 ", ", timestamp); "(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%s", "%s",
stbInfo->sampleDataBuf stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * (*sampleUsePos)); + stbInfo->lenOfOneRow * (*sampleUsePos));
}
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++; (*sampleUsePos)++;
...@@ -6547,7 +6602,7 @@ static int64_t generateStbRowData( ...@@ -6547,7 +6602,7 @@ static int64_t generateStbRowData(
tmpLen = strlen(tmp); tmpLen = strlen(tmp);
tstrncpy(pstr + dataLen, tmp, min(tmpLen + 1, BIGINT_BUFF_LEN)); tstrncpy(pstr + dataLen, tmp, min(tmpLen + 1, BIGINT_BUFF_LEN));
break; break;
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
tmp = rand_ubigint_str(); tmp = rand_ubigint_str();
tmpLen = strlen(tmp); tmpLen = strlen(tmp);
...@@ -6908,6 +6963,9 @@ static int prepareSampleForStb(SSuperTable *stbInfo) { ...@@ -6908,6 +6963,9 @@ static int prepareSampleForStb(SSuperTable *stbInfo) {
int ret; int ret;
if (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample"))) { if (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample"))) {
if(stbInfo->useSampleTs) {
getAndSetRowsFromCsvFile(stbInfo);
}
ret = generateSampleFromCsvForStb(stbInfo); ret = generateSampleFromCsvForStb(stbInfo);
} else { } else {
ret = generateSampleFromRandForStb(stbInfo); ret = generateSampleFromRandForStb(stbInfo);
...@@ -6958,7 +7016,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) ...@@ -6958,7 +7016,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer); __func__, __LINE__, pThreadInfo->buffer);
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, if (0 != postProceSql(g_Dbs.host, g_Dbs.port,
pThreadInfo->buffer, pThreadInfo)) { pThreadInfo->buffer, pThreadInfo)) {
affectedRows = -1; affectedRows = -1;
printf("========restful return fail, threadID[%d]\n", printf("========restful return fail, threadID[%d]\n",
...@@ -7009,12 +7067,11 @@ static void getTableName(char *pTblName, ...@@ -7009,12 +7067,11 @@ static void getTableName(char *pTblName,
stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
} }
} else { } else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", snprintf(pTblName, TSDB_TABLE_NAME_LEN,
stbInfo->childTblPrefix, tableSeq); "%s%"PRIu64"", stbInfo->childTblPrefix, tableSeq);
} }
} else { } else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", g_args.tb_prefix, tableSeq);
g_args.tb_prefix, tableSeq);
} }
} }
...@@ -7434,7 +7491,7 @@ static int32_t prepareStmtBindArrayByType( ...@@ -7434,7 +7491,7 @@ static int32_t prepareStmtBindArrayByType(
bind->length = &bind->buffer_length; bind->length = &bind->buffer_length;
bind->is_null = NULL; bind->is_null = NULL;
break; break;
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
bind_uint = malloc(sizeof(uint32_t)); bind_uint = malloc(sizeof(uint32_t));
assert(bind_uint); assert(bind_uint);
...@@ -10497,6 +10554,33 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -10497,6 +10554,33 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
} }
*/ */
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_Dbs.serv_addr), sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
tsem_init(&(pThreadInfo->lock_sem), 0, 0); tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) { if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo); pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
...@@ -10534,6 +10618,14 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -10534,6 +10618,14 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tmfree((char *)pThreadInfo->bind_ts_array); tmfree((char *)pThreadInfo->bind_ts_array);
tmfree(pThreadInfo->bindParams); tmfree(pThreadInfo->bindParams);
tmfree(pThreadInfo->is_null); tmfree(pThreadInfo->is_null);
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
#else #else
if (pThreadInfo->sampleBindArray) { if (pThreadInfo->sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES; k++) { for (int k = 0; k < MAX_SAMPLES; k++) {
...@@ -11196,6 +11288,31 @@ static int queryTestProcess() { ...@@ -11196,6 +11288,31 @@ static int queryTestProcess() {
} }
} }
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr),
sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
pThreadInfo->taos = NULL;// workaround to use separate taos connection; pThreadInfo->taos = NULL;// workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedTableQuery, pthread_create(pids + seq, NULL, specifiedTableQuery,
...@@ -11247,6 +11364,31 @@ static int queryTestProcess() { ...@@ -11247,6 +11364,31 @@ static int queryTestProcess() {
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1; tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // workaround to use separate taos connection; pThreadInfo->taos = NULL; // workaround to use separate taos connection;
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
#ifdef WINDOWS
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET sockfd;
#else
int sockfd;
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
#ifdef WINDOWS
errorPrint( "Could not create socket : %d" , WSAGetLastError());
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)&(g_queryInfo.serv_addr),
sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
ERROR_EXIT("connecting");
}
pThreadInfo->sockfd = sockfd;
}
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
} }
...@@ -11259,6 +11401,15 @@ static int queryTestProcess() { ...@@ -11259,6 +11401,15 @@ static int queryTestProcess() {
for (int i = 0; i < nConcurrent; i++) { for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) { for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL); pthread_join(pids[i * nSqlCount + j], NULL);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infos + i * nSqlCount + j;
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
} }
} }
} }
...@@ -11268,6 +11419,15 @@ static int queryTestProcess() { ...@@ -11268,6 +11419,15 @@ static int queryTestProcess() {
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL); pthread_join(pidsOfSub[i], NULL);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infosOfSub + i;
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
#endif
}
} }
tmfree((char*)pidsOfSub); tmfree((char*)pidsOfSub);
...@@ -11770,29 +11930,6 @@ static int subscribeTestProcess() { ...@@ -11770,29 +11930,6 @@ static int subscribeTestProcess() {
return 0; return 0;
} }
static void initOfInsertMeta() {
memset(&g_Dbs, 0, sizeof(SDbs));
// set default values
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
g_Dbs.port = 6030;
tstrncpy(g_Dbs.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE);
tstrncpy(g_Dbs.password, TSDB_DEFAULT_PASS, SHELL_MAX_PASSWORD_LEN);
g_Dbs.threadCount = 2;
g_Dbs.use_metric = g_args.use_metric;
}
static void initOfQueryMeta() {
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
// set default values
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
g_queryInfo.port = 6030;
tstrncpy(g_queryInfo.user, TSDB_DEFAULT_USER, MAX_USERNAME_SIZE);
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, SHELL_MAX_PASSWORD_LEN);
}
static void setParaFromArg() { static void setParaFromArg() {
char type[20]; char type[20];
char length[20]; char length[20];
...@@ -11825,7 +11962,7 @@ static void setParaFromArg() { ...@@ -11825,7 +11962,7 @@ static void setParaFromArg() {
tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN); tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN);
g_Dbs.use_metric = g_args.use_metric; g_Dbs.use_metric = g_args.use_metric;
g_args.prepared_rand = min(g_args.insertRows, MAX_PREPARED_RAND);
g_Dbs.aggr_func = g_args.aggr_func; g_Dbs.aggr_func = g_args.aggr_func;
char dataString[TSDB_MAX_BYTES_PER_ROW]; char dataString[TSDB_MAX_BYTES_PER_ROW];
...@@ -11942,7 +12079,6 @@ static int regexMatch(const char *s, const char *reg, int cflags) { ...@@ -11942,7 +12079,6 @@ static int regexMatch(const char *s, const char *reg, int cflags) {
printf("Regex match failed: %s\n", msgbuf); printf("Regex match failed: %s\n", msgbuf);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
return 0; return 0;
} }
...@@ -12102,8 +12238,6 @@ int main(int argc, char *argv[]) { ...@@ -12102,8 +12238,6 @@ int main(int argc, char *argv[]) {
if (g_args.metaFile) { if (g_args.metaFile) {
g_totalChildTables = 0; g_totalChildTables = 0;
initOfInsertMeta();
initOfQueryMeta();
if (false == getInfoFromJsonFile(g_args.metaFile)) { if (false == getInfoFromJsonFile(g_args.metaFile)) {
printf("Failed to read %s\n", g_args.metaFile); printf("Failed to read %s\n", g_args.metaFile);
...@@ -12113,6 +12247,10 @@ int main(int argc, char *argv[]) { ...@@ -12113,6 +12247,10 @@ int main(int argc, char *argv[]) {
testMetaFile(); testMetaFile();
} else { } else {
memset(&g_Dbs, 0, sizeof(SDbs)); memset(&g_Dbs, 0, sizeof(SDbs));
g_Dbs.db = calloc(1, sizeof(SDataBase));
assert(g_Dbs.db);
g_Dbs.db[0].superTbls = calloc(1, sizeof(SSuperTable));
assert(g_Dbs.db[0].superTbls);
setParaFromArg(); setParaFromArg();
if (NULL != g_args.sqlFile) { if (NULL != g_args.sqlFile) {
......
...@@ -41,6 +41,7 @@ typedef struct STable { ...@@ -41,6 +41,7 @@ typedef struct STable {
int16_t restoreColumnNum; int16_t restoreColumnNum;
bool hasRestoreLastColumn; bool hasRestoreLastColumn;
int lastColSVersion; int lastColSVersion;
int16_t cacheLastConfigVersion;
T_REF_DECLARE() T_REF_DECLARE()
} STable; } STable;
......
...@@ -79,8 +79,8 @@ struct STsdbRepo { ...@@ -79,8 +79,8 @@ struct STsdbRepo {
STsdbCfg save_config; // save apply config STsdbCfg save_config; // save apply config
bool config_changed; // config changed flag bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config pthread_mutex_t save_mutex; // protect save config
uint8_t hasCachedLastColumn; int16_t cacheLastConfigVersion;
STsdbAppH appH; STsdbAppH appH;
STsdbStat stat; STsdbStat stat;
...@@ -110,7 +110,8 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); ...@@ -110,7 +110,8 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo);
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable);
void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]);
......
...@@ -146,7 +146,9 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { ...@@ -146,7 +146,9 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) { if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
if (tsdbLockRepo(pRepo) < 0) return; if (tsdbLockRepo(pRepo) < 0) return;
tsdbCacheLastData(pRepo, &oldCfg); // tsdbCacheLastData(pRepo, &oldCfg);
// lazy load last cache when query or update
++pRepo->cacheLastConfigVersion;
tsdbUnlockRepo(pRepo); tsdbUnlockRepo(pRepo);
} }
......
...@@ -64,7 +64,9 @@ int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); } ...@@ -64,7 +64,9 @@ int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); }
void *tsdbCompactImpl(STsdbRepo *pRepo) { void *tsdbCompactImpl(STsdbRepo *pRepo) {
// Check if there are files in TSDB FS to compact // Check if there are files in TSDB FS to compact
if (REPO_FS(pRepo)->cstatus->pmf == NULL) { if (REPO_FS(pRepo)->cstatus->pmf == NULL) {
tsdbInfo("vgId:%d no file to compact in FS", REPO_ID(pRepo)); pRepo->compactState = TSDB_NO_COMPACT;
tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d compact over, no file to compact in FS", REPO_ID(pRepo));
return NULL; return NULL;
} }
......
...@@ -562,7 +562,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -562,7 +562,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return NULL; return NULL;
} }
pRepo->config_changed = false; pRepo->config_changed = false;
atomic_store_8(&pRepo->hasCachedLastColumn, 0); pRepo->cacheLastConfigVersion = 0;
code = tsem_init(&(pRepo->readyToCommit), 0, 1); code = tsem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) { if (code != 0) {
...@@ -711,7 +711,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -711,7 +711,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns); tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
loadStatisData = true; loadStatisData = true;
} }
TSDB_WLOCK_TABLE(pTable); // lock when update pTable->lastCols[]
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
// ignore loaded columns // ignore loaded columns
...@@ -760,6 +760,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -760,6 +760,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
break; break;
} }
} }
TSDB_WUNLOCK_TABLE(pTable);
} }
out: out:
...@@ -774,7 +775,6 @@ out: ...@@ -774,7 +775,6 @@ out:
} }
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
ASSERT(pTable->lastRow == NULL);
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
return -1; return -1;
} }
...@@ -788,21 +788,32 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, ...@@ -788,21 +788,32 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
// Get the data in row // Get the data in row
STSchema *pSchema = tsdbGetTableSchema(pTable); STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema)); SMemRow lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) { if (lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
memRowSetType(pTable->lastRow, SMEM_ROW_DATA); memRowSetType(lastRow, SMEM_ROW_DATA);
tdInitDataRow(memRowDataBody(pTable->lastRow), pSchema); tdInitDataRow(memRowDataBody(lastRow), pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) { for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol); STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, tdAppendColVal(memRowDataBody(lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->offset); pCol->offset);
} }
pTable->lastKey = memRowKey(pTable->lastRow); TSKEY lastKey = memRowKey(lastRow);
// during the load data in file, new data would be inserted and last row has been updated
TSDB_WLOCK_TABLE(pTable);
if (pTable->lastRow == NULL) {
pTable->lastKey = lastKey;
pTable->lastRow = lastRow;
TSDB_WUNLOCK_TABLE(pTable);
} else {
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(lastRow);
}
return 0; return 0;
} }
...@@ -874,14 +885,105 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -874,14 +885,105 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
if (CACHE_LAST_NULL_COLUMN(pCfg)) { // if (CACHE_LAST_NULL_COLUMN(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1); // atomic_store_8(&pRepo->hasCachedLastColumn, 1);
// }
return 0;
}
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
int cacheLastRowTableNum = 0;
int cacheLastColTableNum = 0;
bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config));
bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config));
tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol);
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
if (!cacheLastRow && pTable->lastRow != NULL) {
taosTZfree(pTable->lastRow);
pTable->lastRow = NULL;
} }
if (!cacheLastCol && pTable->lastCols != NULL) {
tsdbFreeLastColumns(pTable);
}
if (!cacheLastRow && !cacheLastCol) {
return 0;
}
cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0;
cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0;
if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0) {
return 0;
}
if (tsdbInitReadH(&readh, pRepo) < 0) {
return -1;
}
tsdbRLockFS(REPO_FS(pRepo));
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
while ((cacheLastRowTableNum > 0 || cacheLastColTableNum > 0) && (pSet = tsdbFSIterNext(&fsiter)) != NULL) {
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
if (tsdbLoadBlockIdx(&readh) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
// tsdbDebug("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
SBlockIdx *pIdx = readh.pBlkIdx;
if (pIdx && (cacheLastRowTableNum > 0) && (pTable->lastRow == NULL)) {
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
cacheLastRowTableNum -= 1;
}
// restore NULL columns
if (pIdx && (cacheLastColTableNum > 0) && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
if (pTable->hasRestoreLastColumn) {
cacheLastColTableNum -= 1;
}
}
}
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return 0; return 0;
} }
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
bool cacheLastRow = false, cacheLastCol = false; bool cacheLastRow = false, cacheLastCol = false;
SFSIter fsiter; SFSIter fsiter;
SReadH readh; SReadH readh;
...@@ -915,9 +1017,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -915,9 +1017,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
// if close last option,need to free data // if close last option,need to free data
if (need_free_last_row || need_free_last_col) { if (need_free_last_row || need_free_last_col) {
if (need_free_last_col) { // if (need_free_last_col) {
atomic_store_8(&pRepo->hasCachedLastColumn, 0); // atomic_store_8(&pRepo->hasCachedLastColumn, 0);
} // }
tsdbInfo("free cache last data since cacheLast option changed"); tsdbInfo("free cache last data since cacheLast option changed");
for (int i = 1; i <= maxTableIdx; i++) { for (int i = 1; i <= maxTableIdx; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
...@@ -995,9 +1097,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -995,9 +1097,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
if (cacheLastCol) { // if (cacheLastCol) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1); // atomic_store_8(&pRepo->hasCachedLastColumn, 1);
} // }
return 0; return 0;
} }
...@@ -996,7 +996,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro ...@@ -996,7 +996,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
if ((value == NULL) || isNull(value, pTCol->type)) { if ((value == NULL) || isNull(value, pTCol->type)) {
continue; continue;
} }
// lock
TSDB_WLOCK_TABLE(pTable);
SDataCol *pDataCol = &(pLatestCols[idx]); SDataCol *pDataCol = &(pLatestCols[idx]);
if (pDataCol->pData == NULL) { if (pDataCol->pData == NULL) {
pDataCol->pData = malloc(pTCol->bytes); pDataCol->pData = malloc(pTCol->bytes);
...@@ -1012,6 +1013,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro ...@@ -1012,6 +1013,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
memcpy(pDataCol->pData, value, bytes); memcpy(pDataCol->pData, value, bytes);
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
pDataCol->ts = memRowKey(row); pDataCol->ts = memRowKey(row);
// unlock
TSDB_WUNLOCK_TABLE(pTable);
} }
} }
...@@ -1058,5 +1061,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r ...@@ -1058,5 +1061,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
updateTableLatestColumn(pRepo, pTable, row); updateTableLatestColumn(pRepo, pTable, row);
} }
} }
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
return 0; return 0;
} }
...@@ -346,7 +346,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) { ...@@ -346,7 +346,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
tsdbError( tsdbError(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag " "vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
"version %d", "version %d",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema)); REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->pSuper->tagSchema));
terrno = TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; terrno = TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
return -1; return -1;
} }
...@@ -627,27 +627,30 @@ int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { ...@@ -627,27 +627,30 @@ int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
} }
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
ASSERT(pTable->lastCols == NULL); TSDB_WLOCK_TABLE(pTable);
if (pTable->lastCols == NULL) {
int16_t numOfColumn = pSchema->numOfCols;
int16_t numOfColumn = pSchema->numOfCols; pTable->lastCols = (SDataCol *)malloc(numOfColumn * sizeof(SDataCol));
if (pTable->lastCols == NULL) {
TSDB_WUNLOCK_TABLE(pTable);
return -1;
}
pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol)); for (int16_t i = 0; i < numOfColumn; ++i) {
if (pTable->lastCols == NULL) { STColumn *pCol = schemaColAt(pSchema, i);
return -1; SDataCol *pDataCol = &(pTable->lastCols[i]);
} pDataCol->bytes = 0;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
}
for (int16_t i = 0; i < numOfColumn; ++i) { pTable->lastColSVersion = schemaVersion(pSchema);
STColumn *pCol = schemaColAt(pSchema, i); pTable->maxColNum = numOfColumn;
SDataCol* pDataCol = &(pTable->lastCols[i]); pTable->restoreColumnNum = 0;
pDataCol->bytes = 0; pTable->hasRestoreLastColumn = false;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
} }
TSDB_WUNLOCK_TABLE(pTable);
pTable->lastColSVersion = schemaVersion(pSchema);
pTable->maxColNum = numOfColumn;
pTable->restoreColumnNum = 0;
pTable->hasRestoreLastColumn = false;
return 0; return 0;
} }
...@@ -797,6 +800,7 @@ static STable *tsdbNewTable() { ...@@ -797,6 +800,7 @@ static STable *tsdbNewTable() {
pTable->lastCols = NULL; pTable->lastCols = NULL;
pTable->restoreColumnNum = 0; pTable->restoreColumnNum = 0;
pTable->cacheLastConfigVersion = 0;
pTable->maxColNum = 0; pTable->maxColNum = 0;
pTable->hasRestoreLastColumn = false; pTable->hasRestoreLastColumn = false;
pTable->lastColSVersion = -1; pTable->lastColSVersion = -1;
......
...@@ -156,6 +156,7 @@ typedef struct STableGroupSupporter { ...@@ -156,6 +156,7 @@ typedef struct STableGroupSupporter {
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle);
static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey); static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
...@@ -589,6 +590,28 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon ...@@ -589,6 +590,28 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
} }
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
STsdbRepo* pRepo = pQueryHandle->pTsdb;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
int32_t code = 0;
for (size_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
STable* pTable = pCheckInfo->pTableObj;
if (pTable->cacheLastConfigVersion == pRepo->cacheLastConfigVersion) {
continue;
}
code = tsdbLoadLastCache(pRepo, pTable);
if (code != 0) {
tsdbError("%p uid:%" PRId64 ", tid:%d, failed to load last cache since %s", pQueryHandle, pTable->tableId.uid,
pTable->tableId.tid, tstrerror(terrno));
break;
}
}
return code;
}
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
pCond->twindow = updateLastrowForEachGroup(groupList); pCond->twindow = updateLastrowForEachGroup(groupList);
...@@ -602,6 +625,8 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -602,6 +625,8 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return NULL; return NULL;
} }
lazyLoadCacheLast(pQueryHandle);
int32_t code = checkForCachedLastRow(pQueryHandle, groupList); int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
...@@ -616,13 +641,14 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -616,13 +641,14 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return pQueryHandle; return pQueryHandle;
} }
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
return NULL; return NULL;
} }
lazyLoadCacheLast(pQueryHandle);
int32_t code = checkForCachedLast(pQueryHandle); int32_t code = checkForCachedLast(pQueryHandle);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
...@@ -2781,6 +2807,9 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -2781,6 +2807,9 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
} }
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
// lock pTable->lastCols[i] as it would be released when schema update(tsdbUpdateLastColSchema)
TSDB_RLOCK_TABLE(pTable);
while(i < tgNumOfCols && j < numOfCols) { while(i < tgNumOfCols && j < numOfCols) {
pColInfo = taosArrayGet(pQueryHandle->pColumns, i); pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pTable->lastCols[j].colId < pColInfo->info.colId) { if (pTable->lastCols[j].colId < pColInfo->info.colId) {
...@@ -2867,6 +2896,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -2867,6 +2896,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
i++; i++;
j++; j++;
} }
TSDB_RUNLOCK_TABLE(pTable);
// leave the real ts column as the last row, because last function only (not stable) use the last row as res // leave the real ts column as the last row, because last function only (not stable) use the last row as res
if (priKey != TSKEY_INITIAL_VAL) { if (priKey != TSKEY_INITIAL_VAL) {
...@@ -3198,7 +3228,9 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -3198,7 +3228,9 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) {
int32_t code = 0; int32_t code = 0;
if (pQueryHandle->pTsdb && atomic_load_8(&pQueryHandle->pTsdb->hasCachedLastColumn)){ STsdbRepo* pRepo = pQueryHandle->pTsdb;
if (pRepo && CACHE_LAST_NULL_COLUMN(&(pRepo->config))) {
pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST; pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
} }
......
...@@ -346,7 +346,7 @@ int tsCompressBoolImp(const char *const input, const int nelements, char *const ...@@ -346,7 +346,7 @@ int tsCompressBoolImp(const char *const input, const int nelements, char *const
/* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */ /* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
output[pos] |= t; output[pos] |= t;
} else { } else {
uError("Invalid compress bool value:%d", output[pos]); uError("Invalid compress bool value:%d", input[i]);
return -1; return -1;
} }
} }
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import sys
sys.path.insert(0, os.getcwd())
from util.log import *
from util.sql import *
from util.dnodes import *
import taos
import threading
import subprocess
from random import choice
class TwoClients:
def initConnection(self):
self.host = "chenhaoran01"
self.user = "root"
self.password = "taosdata"
self.config = "/home/chr/cfg/single/"
self.port =6030
self.rowNum = 10
self.ts = 1537146000000
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
walFilePath = "/var/lib/taos/mnode_bak/wal/"
# new taos client
conn1 = taos.connect(host=self.host, user=self.user, password=self.password, config=self.config )
print(conn1)
cur1 = conn1.cursor()
tdSql.init(cur1, True)
# create backgroud db and tb
tdSql.execute("drop database if exists db1")
os.system("%staosdemo -f compress/insertDataDb1.json -y " % binPath)
# create foreground db and tb
tdSql.execute("drop database if exists foredb")
tdSql.execute("create database foredb")
tdSql.execute("use foredb")
print("123test")
tdSql.execute("create stable if not exists stb (ts timestamp, dataInt int, dataDouble double,dataStr nchar(200)) tags(loc nchar(50),t1 int)")
tdSql.execute("create table tb1 using stb tags('beijing1', 10)")
tdSql.execute("insert into tb1 values(1614218412000,8635,98.861,'qazwsxedcrfvtgbyhnujmikolp1')(1614218422000,8636,98.862,'qazwsxedcrfvtgbyhnujmikolp2')")
tdSql.execute("create table tb2 using stb tags('beijing2', 11)")
tdSql.execute("insert into tb2 values(1614218432000,8647,98.863,'qazwsxedcrfvtgbyhnujmikolp3')")
tdSql.execute("insert into tb2 values(1614218442000,8648,98.864,'qazwsxedcrfvtgbyhnujmikolp4')")
# check data correct
tdSql.execute("use db1")
tdSql.query("select count(tbname) from stb0")
tdSql.checkData(0, 0, 50000)
tdSql.query("select count(*) from stb0")
tdSql.checkData(0, 0, 5000000)
tdSql.execute("use foredb")
tdSql.query("select count (tbname) from stb")
tdSql.checkData(0, 0, 2)
tdSql.query("select count (*) from stb")
tdSql.checkData(0, 0, 4)
tdSql.query("select * from tb1 order by ts")
tdSql.checkData(0, 3, "qazwsxedcrfvtgbyhnujmikolp1")
tdSql.query("select * from tb2 order by ts")
tdSql.checkData(1, 3, "qazwsxedcrfvtgbyhnujmikolp4")
# delete useless file
testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf ./insert_res.txt")
# os.system("rm -rf compress/%s.sql" % testcaseFilename )
clients = TwoClients()
clients.initConnection()
# clients.getBuildPath()
clients.run()
\ No newline at end of file
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"interlace_rows": 10,
"num_of_records_per_req": 1000,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db1",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 50,
"blocks": 8,
"precision": "ms",
"keep": 3650,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb0",
"child_table_exists":"no",
"childtable_count": 50000,
"childtable_prefix": "stb00_",
"auto_create_table": "no",
"batch_create_tbl_num": 100,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 100,
"childtable_limit": 0,
"childtable_offset":0,
"interlace_rows": 0,
"insert_interval":0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"},{"type": "TINYINT"},{"type": "smallint"},{"type": "bool"},{"type": "bigint"},{"type": "float"},{"type": "double"}, {"type": "BINARY","len": 32}, {"type": "nchar","len": 32}],
"tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":1}]
}]
}]
}
...@@ -54,27 +54,36 @@ class TDTestCase: ...@@ -54,27 +54,36 @@ class TDTestCase:
binPath = buildPath + "/build/bin/" binPath = buildPath + "/build/bin/"
if(threadID == 0): if(threadID == 0):
os.system("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT -m t" % print("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT" %
(binPath, self.numberOfTables, self.numberOfRecords))
os.system("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT" %
(binPath, self.numberOfTables, self.numberOfRecords)) (binPath, self.numberOfTables, self.numberOfRecords))
if(threadID == 1): if(threadID == 1):
time.sleep(2) time.sleep(2)
print("use test") print("use test")
while True: max_try = 100
count = 0
while (count < max_try):
try: try:
tdSql.execute("use test") tdSql.execute("use test")
break break
except Exception as e: except Exception as e:
tdLog.info("use database test failed") tdLog.info("use database test failed")
time.sleep(1) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
# check if all the tables have heen created # check if all the tables have heen created
while True: count = 0
while (count < max_try):
try: try:
tdSql.query("show tables") tdSql.query("show tables")
except Exception as e: except Exception as e:
tdLog.info("show tables test failed") tdLog.info("show tables test failed")
time.sleep(1) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
rows = tdSql.queryRows rows = tdSql.queryRows
...@@ -83,13 +92,17 @@ class TDTestCase: ...@@ -83,13 +92,17 @@ class TDTestCase:
break break
time.sleep(1) time.sleep(1)
# check if there are any records in the last created table # check if there are any records in the last created table
while True: count = 0
while (count < max_try):
print("query started") print("query started")
print("try %d times" % count)
try: try:
tdSql.query("select * from test.t7") tdSql.query("select * from test.d7")
except Exception as e: except Exception as e:
tdLog.info("select * test failed") tdLog.info("select * test failed")
time.sleep(2) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
rows = tdSql.queryRows rows = tdSql.queryRows
...@@ -100,8 +113,8 @@ class TDTestCase: ...@@ -100,8 +113,8 @@ class TDTestCase:
print("alter table test.meters add column c10 int") print("alter table test.meters add column c10 int")
tdSql.execute("alter table test.meters add column c10 int") tdSql.execute("alter table test.meters add column c10 int")
print("insert into test.t7 values (now, 1, 2, 3, 4, 0)") print("insert into test.d7 values (now, 1, 2, 3, 4, 0)")
tdSql.execute("insert into test.t7 values (now, 1, 2, 3, 4, 0)") tdSql.execute("insert into test.d7 values (now, 1, 2, 3, 4, 0)")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册