提交 28b875d6 编写于 作者: Z zhaoyanggh

Merge branch 'master' of https://github.com/taosdata/TDengine into test/TD-6167

......@@ -190,6 +190,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
......
......@@ -6943,9 +6943,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
s = &pSchema[colIndex];
}
}
size_t size = tscNumOfExprs(pQueryInfo);
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
int32_t f = TSDB_FUNC_TAG;
......@@ -6953,8 +6951,10 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
f = TSDB_FUNC_TAGPRJ;
}
int32_t pos = tscGetFirstInvisibleFieldPos(pQueryInfo);
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, f, &index, s->type, s->bytes, getNewResColId(pCmd), s->bytes, true);
SExprInfo* pExpr = tscExprInsert(pQueryInfo, pos, f, &index, s->type, s->bytes, getNewResColId(pCmd), s->bytes, true);
memset(pExpr->base.aliasName, 0, sizeof(pExpr->base.aliasName));
tstrncpy(pExpr->base.aliasName, s->name, sizeof(pExpr->base.aliasName));
......@@ -6964,13 +6964,15 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
// NOTE: tag column does not add to source column list
SColumnList ids = createColumnList(1, 0, pColIndex->colIndex);
insertResultField(pQueryInfo, (int32_t)size, &ids, s->bytes, (int8_t)s->type, s->name, pExpr);
insertResultField(pQueryInfo, pos, &ids, s->bytes, (int8_t)s->type, s->name, pExpr);
} else {
// if this query is "group by" normal column, time window query is not allowed
if (isTimeWindowQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
size_t size = tscNumOfExprs(pQueryInfo);
bool hasGroupColumn = false;
for (int32_t j = 0; j < size; ++j) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, j);
......
......@@ -2093,6 +2093,22 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
return f;
}
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo) {
if (pQueryInfo->fieldsInfo.numOfOutput <= 0 || pQueryInfo->fieldsInfo.internalField == NULL) {
return 0;
}
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (!pField->visible) {
return i;
}
}
return pQueryInfo->fieldsInfo.numOfOutput;
}
SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
assert(pFieldInfo != NULL);
pFieldInfo->numOfOutput++;
......
......@@ -18,7 +18,7 @@ public class RestfulConnection extends AbstractConnection {
private final String url;
private final String database;
private final String token;
/******************************************************/
private boolean isClosed;
private final DatabaseMetaData metadata;
......
......@@ -88,17 +88,24 @@ public class RestfulStatement extends AbstractStatement {
}
private String getUrl() throws SQLException {
String dbname = conn.getClientInfo(TSDBDriver.PROPERTY_KEY_DBNAME);
if (dbname == null || dbname.trim().isEmpty()) {
dbname = "";
} else {
dbname = "/" + dbname.toLowerCase();
}
TimestampFormat timestampFormat = TimestampFormat.valueOf(conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).trim().toUpperCase());
String url;
switch (timestampFormat) {
case TIMESTAMP:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt" + dbname;
break;
case UTC:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc" + dbname;
break;
default:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql" + dbname;
}
return url;
}
......
package com.taosdata.jdbc.cases;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class MultiConnectionWithDifferentDbTest {
private static String host = "127.0.0.1";
private static String db1 = "db1";
private static String db2 = "db2";
private long ts;
@Test
public void test() {
List<Thread> threads = IntStream.range(1, 3).mapToObj(i -> new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
queryDb();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void queryDb() {
String url = "jdbc:TAOS-RS://" + host + ":6041/db" + i + "?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(url)) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("select * from weather");
assertNotNull(rs);
rs.next();
long actual = rs.getTimestamp("ts").getTime();
assertEquals(ts, actual);
int f1 = rs.getInt("f1");
assertEquals(i, f1);
String loc = i == 1 ? "beijing" : "shanghai";
String loc_actual = rs.getString("loc");
assertEquals(loc, loc_actual);
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}, "thread-" + i)).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Before
public void before() {
ts = System.currentTimeMillis();
try {
Connection conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists " + db1);
stmt.execute("create database if not exists " + db1);
stmt.execute("use " + db1);
stmt.execute("create table weather(ts timestamp, f1 int) tags(loc nchar(10))");
stmt.execute("insert into t1 using weather tags('beijing') values(" + ts + ", 1)");
stmt.execute("drop database if exists " + db2);
stmt.execute("create database if not exists " + db2);
stmt.execute("use " + db2);
stmt.execute("create table weather(ts timestamp, f1 int) tags(loc nchar(10))");
stmt.execute("insert into t1 using weather tags('shanghai') values(" + ts + ", 2)");
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package com.taosdata.jdbc.rs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import static org.junit.Assert.*;
public class DatabaseSpecifiedTest {
private static String host = "127.0.0.1";
private static String dbname = "test_db_spec";
private Connection connection;
private long ts;
@Test
public void test() throws SQLException {
// when
connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/" + dbname + "?user=root&password=taosdata");
try (Statement stmt = connection.createStatement();) {
ResultSet rs = stmt.executeQuery("select * from weather");
//then
assertNotNull(rs);
rs.next();
long now = rs.getTimestamp("ts").getTime();
assertEquals(ts, now);
int f1 = rs.getInt(2);
assertEquals(1, f1);
String loc = rs.getString("loc");
assertEquals("beijing", loc);
}
connection.close();
}
@Before
public void before() {
ts = System.currentTimeMillis();
try {
Connection connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
Statement stmt = connection.createStatement();
stmt.execute("drop database if exists " + dbname);
stmt.execute("create database if not exists " + dbname);
stmt.execute("use " + dbname);
stmt.execute("create table weather(ts timestamp, f1 int) tags(loc nchar(10))");
stmt.execute("insert into t1 using weather tags('beijing') values( " + ts + ", 1)");
stmt.close();
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@After
public void after() {
try {
if (connection != null)
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
......@@ -55,6 +55,8 @@ typedef struct SShellArguments {
int abort;
int port;
int pktLen;
int pktNum;
char* pktType;
char* netTestRole;
} SShellArguments;
......
......@@ -50,6 +50,8 @@ static struct argp_option options[] = {
{"timezone", 'z', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."},
{"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."},
{0}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
......@@ -106,7 +108,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
arguments->is_raw_time = true;
break;
case 'f':
if (wordexp(arg, &full_path, 0) != 0) {
if ((0 == strlen(arg)) || (wordexp(arg, &full_path, 0) != 0)) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
......@@ -146,6 +148,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return -1;
}
break;
case 'N':
if (arg) {
arguments->pktNum = atoi(arg);
} else {
fprintf(stderr, "Invalid packet number\n");
return -1;
}
break;
case 'S':
arguments->pktType = arg;
break;
case OPT_ABORT:
arguments->abort = 1;
break;
......
......@@ -85,6 +85,8 @@ SShellArguments args = {
.threadNum = 5,
.commands = NULL,
.pktLen = 1000,
.pktNum = 100,
.pktType = "TCP",
.netTestRole = NULL
};
......@@ -118,7 +120,7 @@ int main(int argc, char* argv[]) {
printf("Failed to init taos");
exit(EXIT_FAILURE);
}
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen);
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType);
exit(0);
}
......
......@@ -55,9 +55,13 @@ void printHelp() {
printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local.");
printf("%s%s\n", indent, "-n");
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync.");
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speed|fqdn.");
printf("%s%s\n", indent, "-l");
printf("%s%s%s\n", indent, indent, "Packet length used for net test, default is 1000 bytes.");
printf("%s%s\n", indent, "-N");
printf("%s%s%s\n", indent, indent, "Packet numbers used for net test, default is 100.");
printf("%s%s\n", indent, "-S");
printf("%s%s%s\n", indent, indent, "Packet type used for net test, default is TCP.");
printf("%s%s\n", indent, "-V");
printf("%s%s%s\n", indent, indent, "Print program version.");
......
......@@ -443,6 +443,7 @@ typedef struct SThreadInfo_S {
uint64_t start_table_from;
uint64_t end_table_to;
int64_t ntables;
int64_t tables_created;
uint64_t data_of_rate;
int64_t start_time;
char* cols;
......@@ -639,6 +640,7 @@ SArguments g_args = {
static SDbs g_Dbs;
static int64_t g_totalChildTables = 0;
static int64_t g_actualChildTables = 0;
static SQueryMetaInfo g_queryInfo;
static FILE * g_fpOfInsertResult = NULL;
......@@ -964,6 +966,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
exit(EXIT_FAILURE);
}
arguments->num_of_tables = atoi(argv[++i]);
g_totalChildTables = arguments->num_of_tables;
} else if (strcmp(argv[i], "-n") == 0) {
if ((argc == i+1) ||
(!isStringNumber(argv[i+1]))) {
......@@ -1134,7 +1137,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
exit(EXIT_FAILURE);
}
} else if ((strcmp(argv[i], "--version") == 0) ||
(strcmp(argv[i], "-V") == 0)){
(strcmp(argv[i], "-V") == 0)) {
printVersion();
exit(0);
} else if (strcmp(argv[i], "--help") == 0) {
......@@ -1345,14 +1348,14 @@ static void selectAndGetResult(
}
}
static char *rand_bool_str(){
static char *rand_bool_str() {
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
return g_randbool_buff + ((cursor % MAX_PREPARED_RAND) * BOOL_BUFF_LEN);
}
static int32_t rand_bool(){
static int32_t rand_bool() {
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
......@@ -1485,7 +1488,7 @@ static char *demo_phase_float_str() {
return g_rand_phase_buff + ((cursor % MAX_PREPARED_RAND) * FLOAT_BUFF_LEN);
}
static float UNUSED_FUNC demo_phase_float(){
static float UNUSED_FUNC demo_phase_float() {
static int cursor;
cursor++;
if (cursor > (MAX_PREPARED_RAND - 1)) cursor = 0;
......@@ -1564,7 +1567,7 @@ static void init_rand_data() {
g_randdouble_buff = calloc(1, DOUBLE_BUFF_LEN * MAX_PREPARED_RAND);
assert(g_randdouble_buff);
for (int i = 0; i < MAX_PREPARED_RAND; i++){
for (int i = 0; i < MAX_PREPARED_RAND; i++) {
g_randint[i] = (int)(taosRandom() % 65535);
sprintf(g_randint_buff + i * INT_BUFF_LEN, "%d",
g_randint[i]);
......@@ -3276,6 +3279,7 @@ static void* createTable(void *sarg)
pThreadInfo->db_name,
g_args.tb_prefix, i,
pThreadInfo->cols);
batchNum ++;
} else {
if (stbInfo == NULL) {
free(pThreadInfo->buffer);
......@@ -3325,13 +3329,14 @@ static void* createTable(void *sarg)
len = 0;
if (0 != queryDbExec(pThreadInfo->taos, pThreadInfo->buffer,
NO_INSERT_TYPE, false)){
NO_INSERT_TYPE, false)) {
errorPrint2("queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer);
free(pThreadInfo->buffer);
return NULL;
}
pThreadInfo->tables_created += batchNum;
uint64_t currentPrintTime = taosGetTimestampMs();
uint64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
pThreadInfo->threadID, pThreadInfo->start_table_from, i);
......@@ -3401,6 +3406,7 @@ static int startMultiThreadCreateChildTable(
pThreadInfo->use_metric = true;
pThreadInfo->cols = cols;
pThreadInfo->minDelay = UINT64_MAX;
pThreadInfo->tables_created = 0;
pthread_create(pids + i, NULL, createTable, pThreadInfo);
}
......@@ -3411,6 +3417,8 @@ static int startMultiThreadCreateChildTable(
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
taos_close(pThreadInfo->taos);
g_actualChildTables += pThreadInfo->tables_created;
}
free(pids);
......@@ -3437,7 +3445,6 @@ static void createChildTables() {
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
uint64_t startFrom = 0;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n",
__func__, __LINE__, g_totalChildTables, startFrom);
......@@ -4232,6 +4239,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
cJSON *dataSource = cJSON_GetObjectItem(stbInfo, "data_source");
if (dataSource && dataSource->type == cJSON_String
......@@ -4936,7 +4944,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){
&& result->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
......@@ -7586,7 +7594,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0);
if (ret != 0){
if (ret != 0) {
free(pids);
free(infos);
free(stmtBuffer);
......@@ -7932,18 +7940,30 @@ static int insertTestProcess() {
double start;
double end;
// create child tables
start = taosGetTimestampMs();
createChildTables();
end = taosGetTimestampMs();
if (g_totalChildTables > 0) {
fprintf(stderr, "Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
fprintf(stderr,
"creating %"PRId64" table(s) with %d thread(s)\n\n",
g_totalChildTables, g_Dbs.threadCountByCreateTbl);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"creating %"PRId64" table(s) with %d thread(s)\n\n",
g_totalChildTables, g_Dbs.threadCountByCreateTbl);
}
// create child tables
start = taosGetTimestampMs();
createChildTables();
end = taosGetTimestampMs();
fprintf(stderr,
"\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n",
(end - start)/1000.0, g_totalChildTables,
g_Dbs.threadCountByCreateTbl, g_actualChildTables);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
"\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n",
(end - start)/1000.0, g_totalChildTables,
g_Dbs.threadCountByCreateTbl, g_actualChildTables);
}
}
......
......@@ -1572,7 +1572,7 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
int32_t numOfColsOfRow1 = 0;
if (pSchema1 == NULL) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row1));
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
}
if(isRow1DataRow) {
numOfColsOfRow1 = schemaNCols(pSchema1);
......@@ -1584,7 +1584,7 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
if(row2) {
isRow2DataRow = isDataRow(row2);
if (pSchema2 == NULL) {
pSchema2 = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row2));
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
}
if(isRow2DataRow) {
numOfColsOfRow2 = schemaNCols(pSchema2);
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
void taosNetTest(char *role, char *host, int port, int pkgLen);
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, int32_t pkgNum, char *pkgType);
#ifdef __cplusplus
}
......
......@@ -27,6 +27,10 @@
#include "syncMsg.h"
#define MAX_PKG_LEN (64 * 1000)
#define MAX_SPEED_PKG_LEN (1024 * 1024 * 1024)
#define MIN_SPEED_PKG_LEN 1024
#define MAX_SPEED_PKG_NUM 10000
#define MIN_SPEED_PKG_NUM 1
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
extern int32_t tsRpcMaxUdpSize;
......@@ -466,6 +470,7 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
sendpkgLen = pkgLen;
}
tsRpcForceTcp = 1;
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
if (ret < 0) {
printf("failed to test TCP port:%d\n", port);
......@@ -479,6 +484,7 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
sendpkgLen = pkgLen;
}
tsRpcForceTcp = 0;
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
if (ret < 0) {
printf("failed to test UDP port:%d\n", port);
......@@ -542,12 +548,110 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
}
}
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
static void taosNetTestFqdn(char *host) {
int code = 0;
uint64_t startTime = taosGetTimestampUs();
uint32_t ip = taosGetIpv4FromFqdn(host);
if (ip == 0xffffffff) {
uError("failed to get IP address from %s since %s", host, strerror(errno));
code = -1;
}
uint64_t endTime = taosGetTimestampUs();
uint64_t el = endTime - startTime;
printf("check convert fqdn spend, status: %d\tcost: %" PRIu64 " us\n", code, el);
return;
}
static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
int32_t pkgNum, char *pkgType) {
// record config
int32_t compressTmp = tsCompressMsgSize;
int32_t maxUdpSize = tsRpcMaxUdpSize;
int32_t forceTcp = tsRpcForceTcp;
if (0 == strcmp("tcp", pkgType)){
tsRpcForceTcp = 1;
tsRpcMaxUdpSize = 0; // force tcp
} else {
tsRpcForceTcp = 0;
tsRpcMaxUdpSize = INT_MAX;
}
tsCompressMsgSize = -1;
SRpcEpSet epSet;
SRpcMsg reqMsg;
SRpcMsg rspMsg;
void * pRpcConn;
char secretEncrypt[32] = {0};
char spi = 0;
pRpcConn = taosNetInitRpc(secretEncrypt, spi);
if (NULL == pRpcConn) {
uError("failed to init client rpc");
return;
}
printf("check net spend, host:%s port:%d pkgLen:%d pkgNum:%d pkgType:%s\n\n", host, port, pkgLen, pkgNum, pkgType);
int32_t totalSucc = 0;
uint64_t startT = taosGetTimestampUs();
for (int32_t i = 1; i <= pkgNum; i++) {
uint64_t startTime = taosGetTimestampUs();
memset(&epSet, 0, sizeof(SRpcEpSet));
epSet.inUse = 0;
epSet.numOfEps = 1;
epSet.port[0] = port;
strcpy(epSet.fqdn[0], host);
reqMsg.msgType = TSDB_MSG_TYPE_NETWORK_TEST;
reqMsg.pCont = rpcMallocCont(pkgLen);
reqMsg.contLen = pkgLen;
reqMsg.code = 0;
reqMsg.handle = NULL; // rpc handle returned to app
reqMsg.ahandle = NULL; // app handle set by client
strcpy(reqMsg.pCont, "nettest speed");
rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);
int code = 0;
if ((rspMsg.code != 0) || (rspMsg.msgType != TSDB_MSG_TYPE_NETWORK_TEST + 1)) {
uError("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
code = -1;
}else{
totalSucc ++;
}
rpcFreeCont(rspMsg.pCont);
uint64_t endTime = taosGetTimestampUs();
uint64_t el = endTime - startTime;
printf("progress:%5d/%d\tstatus:%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", i, pkgNum, code, el/1000.0, pkgLen/(el/1000000.0)/1024.0/1024.0);
}
int64_t endT = taosGetTimestampUs();
uint64_t elT = endT - startT;
printf("\ntotal succ:%5d/%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", totalSucc, pkgNum, elT/1000.0, pkgLen/(elT/1000000.0)/1024.0/1024.0*totalSucc);
rpcClose(pRpcConn);
// return config
tsCompressMsgSize = compressTmp;
tsRpcMaxUdpSize = maxUdpSize;
tsRpcForceTcp = forceTcp;
return;
}
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
int32_t pkgNum, char *pkgType) {
tscEmbedded = 1;
if (host == NULL) host = tsLocalFqdn;
if (port == 0) port = tsServerPort;
if (pkgLen <= 10) pkgLen = 1000;
if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN;
if (0 == strcmp("speed", role)){
if (pkgLen <= MIN_SPEED_PKG_LEN) pkgLen = MIN_SPEED_PKG_LEN;
if (pkgLen > MAX_SPEED_PKG_LEN) pkgLen = MAX_SPEED_PKG_LEN;
if (pkgNum <= MIN_SPEED_PKG_NUM) pkgNum = MIN_SPEED_PKG_NUM;
if (pkgNum > MAX_SPEED_PKG_NUM) pkgNum = MAX_SPEED_PKG_NUM;
}else{
if (pkgLen <= 10) pkgLen = 1000;
if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN;
}
if (0 == strcmp("client", role)) {
taosNetTestClient(host, port, pkgLen);
......@@ -560,6 +664,12 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
taosNetCheckSync(host, port);
} else if (0 == strcmp("startup", role)) {
taosNetTestStartup(host, port);
} else if (0 == strcmp("speed", role)) {
tscEmbedded = 0;
char type[10] = {0};
taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType));
}else if (0 == strcmp("fqdn", role)) {
taosNetTestFqdn(host);
} else {
taosNetTestStartup(host, port);
}
......
#!/bin/bash
taos -n fqdn
#!/bin/bash
for N in -1 0 1 10000 10001
do
for l in 1023 1024 1073741824 1073741825
do
for S in udp tcp
do
taos -n speed -h BCC-2 -P 6030 -N $N -l $l -S $S 2>&1 | tee -a result.txt
done
done
done
......@@ -975,6 +975,84 @@ class TDTestCase:
tdSql.error("select ts as t, bottom(t1, 3) from t1 order by c3")
tdSql.error("select ts as t, top(c1, 0) from t1")
tdSql.query("select ts as t, top(c1, 5) from t1")
tdSql.checkRows(5)
tdSql.checkCols(3)
for i in range(5):
data=tdSql.getData(i, 0)
tdSql.checkData(i, 1, data)
tdSql.query("select ts as t, top(c1, 5) from stb1")
tdSql.checkRows(5)
tdSql.query("select ts as t, top(c1, 5) from stb1 group by tbname")
tdSql.checkRows(500)
tdSql.query("select ts as t, top(c1, 8) from t1")
tdSql.checkRows(6)
tdSql.query("select ts as t, top(c2, 8) from t1")
tdSql.checkRows(6)
tdSql.error("select ts as t, top(c3, 5) from t1")
tdSql.error("select ts as t, top(c4, 5) from t1")
tdSql.query("select ts as t, top(c5, 8) from t1")
tdSql.checkRows(6)
tdSql.error("select ts as t, top(c6, 5) from t1")
tdSql.error("select ts as t, top(c5, 8) as b from t1 order by b")
tdSql.error("select ts as t, top(t1, 1) from t1")
tdSql.error("select ts as t, top(t1, 1) from stb1")
tdSql.error("select ts as t, top(t1, 3) from stb1 order by c3")
tdSql.error("select ts as t, top(t1, 3) from t1 order by c3")
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.query("select ts as t, diff(c1) from t1")
tdSql.checkRows(5)
tdSql.checkCols(3)
for i in range(5):
data=tdSql.getData(i, 0)
tdSql.checkData(i, 1, data)
tdSql.query("select ts as t, diff(c1) from stb1 group by tbname")
tdSql.checkRows(500)
tdSql.checkCols(4)
tdSql.query("select ts as t, diff(c1) from t1")
tdSql.query("select ts as t, diff(c1) from t1")
tdSql.error("select ts as t, diff(c1) from stb1")
tdSql.query("select ts as t, diff(c2) from t1")
tdSql.checkRows(5)
tdSql.error("select ts as t, diff(c3) from t1")
tdSql.error("select ts as t, diff(c4) from t1")
tdSql.query("select ts as t, diff(c5) from t1")
tdSql.checkRows(5)
tdSql.error("select ts as t, diff(c6) from t1")
tdSql.error("select ts as t, diff(t1) from t1")
tdSql.error("select ts as t, diff(c1, c2) from t1")
tdSql.error("select ts as t, bottom(c1, 0) from t1")
tdSql.query("select ts as t, bottom(c1, 5) from t1")
tdSql.checkRows(5)
tdSql.checkCols(3)
for i in range(5):
data=tdSql.getData(i, 0)
tdSql.checkData(i, 1, data)
tdSql.query("select ts as t, bottom(c1, 5) from stb1")
tdSql.checkRows(5)
tdSql.query("select ts as t, bottom(c1, 5) from stb1 group by tbname")
tdSql.checkRows(500)
tdSql.query("select ts as t, bottom(c1, 8) from t1")
tdSql.checkRows(6)
tdSql.query("select ts as t, bottom(c2, 8) from t1")
tdSql.checkRows(6)
tdSql.error("select ts as t, bottom(c3, 5) from t1")
tdSql.error("select ts as t, bottom(c4, 5) from t1")
tdSql.query("select ts as t, bottom(c5, 8) from t1")
tdSql.checkRows(6)
tdSql.error("select ts as t, bottom(c6, 5) from t1")
tdSql.error("select ts as t, bottom(c5, 8) as b from t1 order by b")
tdSql.error("select ts as t, bottom(t1, 1) from t1")
tdSql.error("select ts as t, bottom(t1, 1) from stb1")
tdSql.error("select ts as t, bottom(t1, 3) from stb1 order by c3")
tdSql.error("select ts as t, bottom(t1, 3) from t1 order by c3")
tdSql.error("select ts as t, top(c1, 0) from t1")
tdSql.query("select ts as t, top(c1, 5) from t1")
tdSql.checkRows(5)
......
......@@ -17,6 +17,7 @@ import os
import taos
import time
import argparse
import json
class taosdemoQueryPerformace:
......@@ -48,7 +49,7 @@ class taosdemoQueryPerformace:
cursor2 = self.conn2.cursor()
cursor2.execute("create database if not exists %s" % self.dbName)
cursor2.execute("use %s" % self.dbName)
cursor2.execute("create table if not exists %s(ts timestamp, query_time float, commit_id binary(50), branch binary(50), type binary(20)) tags(query_id int, query_sql binary(300))" % self.stbName)
cursor2.execute("create table if not exists %s(ts timestamp, query_time_avg float, query_time_max float, query_time_min float, commit_id binary(50), branch binary(50), type binary(20)) tags(query_id int, query_sql binary(300))" % self.stbName)
sql = "select count(*) from test.meters"
tableid = 1
......@@ -74,7 +75,7 @@ class taosdemoQueryPerformace:
tableid = 6
cursor2.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select * from meters"
sql = "select * from meters limit 10000"
tableid = 7
cursor2.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
......@@ -87,37 +88,96 @@ class taosdemoQueryPerformace:
cursor2.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
cursor2.close()
def generateQueryJson(self):
sqls = []
cursor2 = self.conn2.cursor()
cursor2.execute("select query_id, query_sql from %s.%s" % (self.dbName, self.stbName))
i = 0
for data in cursor2:
sql = {
"sql": data[1],
"result_mode": "onlyformat",
"result_file": "./query_sql_res%d.txt" % i
}
sqls.append(sql)
i += 1
query_data = {
"filetype": "query",
"cfgdir": "/etc/perf",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"databases": "test",
"specified_table_query": {
"query_times": 100,
"concurrent": 1,
"sqls": sqls
}
}
query_json_file = f"/tmp/query.json"
with open(query_json_file, 'w') as f:
json.dump(query_data, f)
return query_json_file
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosdemo" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def getCMDOutput(self, cmd):
cmd = os.popen(cmd)
output = cmd.read()
cmd.close()
return output
def query(self):
cursor = self.conn.cursor()
print("==================== query performance ====================")
buildPath = self.getBuildPath()
if (buildPath == ""):
print("taosdemo not found!")
sys.exit(1)
binPath = buildPath + "/build/bin/"
os.system(
"%sperfMonitor -f %s > query_res.txt" %
(binPath, self.generateQueryJson()))
cursor = self.conn2.cursor()
print("==================== query performance ====================")
cursor.execute("use %s" % self.dbName)
cursor.execute("select tbname, query_id, query_sql from %s" % self.stbName)
cursor.execute("select tbname, query_sql from %s" % self.stbName)
i = 0
for data in cursor:
table_name = data[0]
query_id = data[1]
sql = data[2]
sql = data[1]
self.avgDelay = self.getCMDOutput("grep 'avgDelay' query_res.txt | awk 'NR==%d{print $2}'" % (i + 1))
self.maxDelay = self.getCMDOutput("grep 'avgDelay' query_res.txt | awk 'NR==%d{print $5}'" % (i + 1))
self.minDelay = self.getCMDOutput("grep 'avgDelay' query_res.txt | awk 'NR==%d{print $8}'" % (i + 1))
i += 1
print("query time for: %s %f seconds" % (sql, float(self.avgDelay)))
c = self.conn2.cursor()
c.execute("insert into %s.%s values(now, %f, %f, %f, '%s', '%s', '%s')" % (self.dbName, table_name, float(self.avgDelay), float(self.maxDelay), float(self.minDelay), self.commitID, self.branch, self.type))
totalTime = 0
cursor2 = self.conn.cursor()
cursor2.execute("use test")
for i in range(100):
if(self.clearCache == True):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor2.execute(sql)
totalTime += time.time() - startTime
cursor2.close()
print("query time for: %s %f seconds" % (sql, totalTime / 100))
cursor3 = self.conn2.cursor()
cursor3.execute("insert into %s.%s values(now, %f, '%s', '%s', '%s')" % (self.dbName, table_name, totalTime / 100, self.commitID, self.branch, self.type))
cursor3.close()
c.close()
cursor.close()
if __name__ == '__main__':
......@@ -174,4 +234,4 @@ if __name__ == '__main__':
args = parser.parse_args()
perftest = taosdemoQueryPerformace(args.remove_cache, args.commit_id, args.database_name, args.stable_name, args.table_perfix, args.git_branch, args.build_type)
perftest.createPerfTables()
perftest.query()
perftest.query()
\ No newline at end of file
......@@ -49,24 +49,18 @@ class taosdemoPerformace:
def generateJson(self):
db = {
"name": "%s" % self.insertDB,
"drop": "yes",
"replica": 1
"drop": "yes"
}
stb = {
"name": "meters",
"child_table_exists": "no",
"childtable_count": self.numOfTables,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"data_source": "rand",
"batch_create_tbl_num": 10,
"insert_mode": "taosc",
"insert_mode": "rand",
"insert_rows": self.numOfRows,
"interlace_rows": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"batch_rows": 1000000,
"max_sql_len": 1048576,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
......@@ -100,11 +94,8 @@ class taosdemoPerformace:
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 30000,
"databases": [db]
}
......@@ -145,7 +136,7 @@ class taosdemoPerformace:
binPath = buildPath + "/build/bin/"
os.system(
"%staosdemo -f %s > /dev/null 2>&1" %
"%sperfMonitor -f %s > /dev/null 2>&1" %
(binPath, self.generateJson()))
self.createTableTime = self.getCMDOutput(
"grep 'Spent' insert_res.txt | awk 'NR==1{print $2}'")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册