提交 beb74d6d 编写于 作者: Z zhaoyanggh

Merge branch 'master' of https://github.com/taosdata/TDengine into fix/TS-165

......@@ -190,6 +190,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
......
......@@ -6943,9 +6943,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
s = &pSchema[colIndex];
}
}
size_t size = tscNumOfExprs(pQueryInfo);
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
int32_t f = TSDB_FUNC_TAG;
......@@ -6953,8 +6951,10 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
f = TSDB_FUNC_TAGPRJ;
}
int32_t pos = tscGetFirstInvisibleFieldPos(pQueryInfo);
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, f, &index, s->type, s->bytes, getNewResColId(pCmd), s->bytes, true);
SExprInfo* pExpr = tscExprInsert(pQueryInfo, pos, f, &index, s->type, s->bytes, getNewResColId(pCmd), s->bytes, true);
memset(pExpr->base.aliasName, 0, sizeof(pExpr->base.aliasName));
tstrncpy(pExpr->base.aliasName, s->name, sizeof(pExpr->base.aliasName));
......@@ -6964,13 +6964,15 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
// NOTE: tag column does not add to source column list
SColumnList ids = createColumnList(1, 0, pColIndex->colIndex);
insertResultField(pQueryInfo, (int32_t)size, &ids, s->bytes, (int8_t)s->type, s->name, pExpr);
insertResultField(pQueryInfo, pos, &ids, s->bytes, (int8_t)s->type, s->name, pExpr);
} else {
// if this query is "group by" normal column, time window query is not allowed
if (isTimeWindowQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
size_t size = tscNumOfExprs(pQueryInfo);
bool hasGroupColumn = false;
for (int32_t j = 0; j < size; ++j) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, j);
......
......@@ -841,6 +841,11 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) {
tableSerialize = totalTables * sizeof(STableIdInfo);
}
SCond* pCond = &pQueryInfo->tagCond.tbnameCond;
if (pCond->len > 0) {
srcColListSize += pCond->len;
}
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + srcColFilterSize + srcTagFilterSize +
exprSize + tsBufSize + tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
}
......
......@@ -15,8 +15,9 @@
#define _GNU_SOURCE
#include "os.h"
#include "texpr.h"
#include "tsched.h"
#include "qTsbuf.h"
#include "tcompare.h"
#include "tscLog.h"
......@@ -2425,6 +2426,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
return terrno;
}
typedef struct SPair {
int32_t first;
int32_t second;
} SPair;
static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
SSqlObj* pSql = pSchedMsg->ahandle;
SPair* p = pSchedMsg->msg;
for(int32_t i = p->first; i < p->second; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
tscBuildAndSendRequest(pSub, NULL);
}
tfree(p);
}
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -2547,13 +2568,33 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
doCleanupSubqueries(pSql, i);
return pRes->code;
}
for(int32_t j = 0; j < pState->numOfSub; ++j) {
SSqlObj* pSub = pSql->pSubs[j];
SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
tscBuildAndSendRequest(pSub, NULL);
// concurrently sent the query requests.
const int32_t MAX_REQUEST_PER_TASK = 8;
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
assert(numOfTasks >= 1);
int32_t num = (pState->numOfSub/numOfTasks) + 1;
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doSendQueryReqs;
schedMsg.ahandle = (void*)pSql;
schedMsg.thandle = NULL;
SPair* p = calloc(1, sizeof(SPair));
p->first = j * num;
if (j == numOfTasks - 1) {
p->second = pState->numOfSub;
} else {
p->second = (j + 1) * num;
}
schedMsg.msg = p;
taosScheduleTask(tscQhandle, &schedMsg);
}
return TSDB_CODE_SUCCESS;
......
......@@ -2093,6 +2093,22 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) {
return f;
}
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo) {
if (pQueryInfo->fieldsInfo.numOfOutput <= 0 || pQueryInfo->fieldsInfo.internalField == NULL) {
return 0;
}
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (!pField->visible) {
return i;
}
}
return pQueryInfo->fieldsInfo.numOfOutput;
}
SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
assert(pFieldInfo != NULL);
pFieldInfo->numOfOutput++;
......@@ -3778,6 +3794,8 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -3858,6 +3876,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->fp = tscSubqueryCompleteCallback;
pNew->fetchFp = tscSubqueryCompleteCallback;
pNew->maxRetry = pSql->maxRetry;
pNew->cmd.resColumnId = TSDB_RES_COL_ID;
......
......@@ -18,7 +18,7 @@ public class RestfulConnection extends AbstractConnection {
private final String url;
private final String database;
private final String token;
/******************************************************/
private boolean isClosed;
private final DatabaseMetaData metadata;
......
......@@ -88,17 +88,24 @@ public class RestfulStatement extends AbstractStatement {
}
private String getUrl() throws SQLException {
String dbname = conn.getClientInfo(TSDBDriver.PROPERTY_KEY_DBNAME);
if (dbname == null || dbname.trim().isEmpty()) {
dbname = "";
} else {
dbname = "/" + dbname.toLowerCase();
}
TimestampFormat timestampFormat = TimestampFormat.valueOf(conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).trim().toUpperCase());
String url;
switch (timestampFormat) {
case TIMESTAMP:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt" + dbname;
break;
case UTC:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc" + dbname;
break;
default:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql" + dbname;
}
return url;
}
......
package com.taosdata.jdbc.cases;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class MultiConnectionWithDifferentDbTest {
private static String host = "127.0.0.1";
private static String db1 = "db1";
private static String db2 = "db2";
private long ts;
@Test
public void test() {
List<Thread> threads = IntStream.range(1, 3).mapToObj(i -> new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
queryDb();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void queryDb() {
String url = "jdbc:TAOS-RS://" + host + ":6041/db" + i + "?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(url)) {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("select * from weather");
assertNotNull(rs);
rs.next();
long actual = rs.getTimestamp("ts").getTime();
assertEquals(ts, actual);
int f1 = rs.getInt("f1");
assertEquals(i, f1);
String loc = i == 1 ? "beijing" : "shanghai";
String loc_actual = rs.getString("loc");
assertEquals(loc, loc_actual);
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}, "thread-" + i)).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Before
public void before() {
ts = System.currentTimeMillis();
try {
Connection conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists " + db1);
stmt.execute("create database if not exists " + db1);
stmt.execute("use " + db1);
stmt.execute("create table weather(ts timestamp, f1 int) tags(loc nchar(10))");
stmt.execute("insert into t1 using weather tags('beijing') values(" + ts + ", 1)");
stmt.execute("drop database if exists " + db2);
stmt.execute("create database if not exists " + db2);
stmt.execute("use " + db2);
stmt.execute("create table weather(ts timestamp, f1 int) tags(loc nchar(10))");
stmt.execute("insert into t1 using weather tags('shanghai') values(" + ts + ", 2)");
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package com.taosdata.jdbc.rs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import static org.junit.Assert.*;
public class DatabaseSpecifiedTest {
private static String host = "127.0.0.1";
private static String dbname = "test_db_spec";
private Connection connection;
private long ts;
@Test
public void test() throws SQLException {
// when
connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/" + dbname + "?user=root&password=taosdata");
try (Statement stmt = connection.createStatement();) {
ResultSet rs = stmt.executeQuery("select * from weather");
//then
assertNotNull(rs);
rs.next();
long now = rs.getTimestamp("ts").getTime();
assertEquals(ts, now);
int f1 = rs.getInt(2);
assertEquals(1, f1);
String loc = rs.getString("loc");
assertEquals("beijing", loc);
}
connection.close();
}
@Before
public void before() {
ts = System.currentTimeMillis();
try {
Connection connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
Statement stmt = connection.createStatement();
stmt.execute("drop database if exists " + dbname);
stmt.execute("create database if not exists " + dbname);
stmt.execute("use " + dbname);
stmt.execute("create table weather(ts timestamp, f1 int) tags(loc nchar(10))");
stmt.execute("insert into t1 using weather tags('beijing') values( " + ts + ", 1)");
stmt.close();
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@After
public void after() {
try {
if (connection != null)
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
......@@ -2,7 +2,7 @@ import taos
conn = taos.connect(host='127.0.0.1',
user='root',
passworkd='taodata',
password='taosdata',
database='log')
cursor = conn.cursor()
......
......@@ -268,7 +268,7 @@ def _load_taos():
try:
return load_func[platform.system()]()
except:
sys.exit('unsupported platform to TDengine connector')
raise InterfaceError('unsupported platform or failed to load taos client library')
class CTaosInterface(object):
......
......@@ -55,6 +55,8 @@ typedef struct SShellArguments {
int abort;
int port;
int pktLen;
int pktNum;
char* pktType;
char* netTestRole;
} SShellArguments;
......
......@@ -50,6 +50,8 @@ static struct argp_option options[] = {
{"timezone", 'z', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."},
{"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."},
{0}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
......@@ -106,7 +108,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
arguments->is_raw_time = true;
break;
case 'f':
if (wordexp(arg, &full_path, 0) != 0) {
if ((0 == strlen(arg)) || (wordexp(arg, &full_path, 0) != 0)) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
......@@ -146,6 +148,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return -1;
}
break;
case 'N':
if (arg) {
arguments->pktNum = atoi(arg);
} else {
fprintf(stderr, "Invalid packet number\n");
return -1;
}
break;
case 'S':
arguments->pktType = arg;
break;
case OPT_ABORT:
arguments->abort = 1;
break;
......
......@@ -85,6 +85,8 @@ SShellArguments args = {
.threadNum = 5,
.commands = NULL,
.pktLen = 1000,
.pktNum = 100,
.pktType = "TCP",
.netTestRole = NULL
};
......@@ -118,7 +120,7 @@ int main(int argc, char* argv[]) {
printf("Failed to init taos");
exit(EXIT_FAILURE);
}
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen);
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType);
exit(0);
}
......
......@@ -55,9 +55,13 @@ void printHelp() {
printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local.");
printf("%s%s\n", indent, "-n");
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync.");
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speed|fqdn.");
printf("%s%s\n", indent, "-l");
printf("%s%s%s\n", indent, indent, "Packet length used for net test, default is 1000 bytes.");
printf("%s%s\n", indent, "-N");
printf("%s%s%s\n", indent, indent, "Packet numbers used for net test, default is 100.");
printf("%s%s\n", indent, "-S");
printf("%s%s%s\n", indent, indent, "Packet type used for net test, default is TCP.");
printf("%s%s\n", indent, "-V");
printf("%s%s%s\n", indent, indent, "Print program version.");
......
此差异已折叠。
......@@ -50,14 +50,20 @@ void osInit() {
char* taosGetCmdlineByPID(int pid) {
static char cmdline[1024];
sprintf(cmdline, "/proc/%d/cmdline", pid);
FILE* f = fopen(cmdline, "r");
if (f) {
size_t size;
size = fread(cmdline, sizeof(char), 1024, f);
if (size > 0) {
if ('\n' == cmdline[size - 1]) cmdline[size - 1] = '\0';
}
fclose(f);
int fd = open(cmdline, O_RDONLY);
if (fd >= 0) {
int n = read(fd, cmdline, sizeof(cmdline) - 1);
if (n < 0) n = 0;
if (n > 0 && cmdline[n - 1] == '\n') --n;
cmdline[n] = 0;
close(fd);
} else {
cmdline[0] = 0;
}
return cmdline;
}
......@@ -34,7 +34,7 @@
#define monTrace(...) { if (monDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }}
#define SQL_LENGTH 1030
#define LOG_LEN_STR 100
#define LOG_LEN_STR 512
#define IP_LEN_STR TSDB_EP_LEN
#define CHECK_INTERVAL 1000
......
......@@ -4089,7 +4089,7 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD
} else {
pDist->maxRows = pSrc->maxRows;
pDist->minRows = pSrc->minRows;
int32_t maxSteps = TSDB_MAX_MAX_ROW_FBLOCK/TSDB_BLOCK_DIST_STEP_ROWS;
if (TSDB_MAX_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
++maxSteps;
......@@ -4223,7 +4223,7 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) {
taosArrayDestroy(pDist->dataBlockInfos);
pDist->dataBlockInfos = NULL;
}
// cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG;
......
......@@ -7449,10 +7449,12 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg->numOfOutput = htons(pQueryMsg->numOfOutput);
pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols);
pQueryMsg->tagCondLen = htonl(pQueryMsg->tagCondLen);
pQueryMsg->tsBuf.tsOffset = htonl(pQueryMsg->tsBuf.tsOffset);
pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen);
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
pQueryMsg->tsBuf.tsOrder = htonl(pQueryMsg->tsBuf.tsOrder);
pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags);
pQueryMsg->tbnameCondLen = htonl(pQueryMsg->tbnameCondLen);
pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput);
......
......@@ -1572,7 +1572,7 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
int32_t numOfColsOfRow1 = 0;
if (pSchema1 == NULL) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row1));
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
}
if(isRow1DataRow) {
numOfColsOfRow1 = schemaNCols(pSchema1);
......@@ -1584,7 +1584,7 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
if(row2) {
isRow2DataRow = isDataRow(row2);
if (pSchema2 == NULL) {
pSchema2 = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row2));
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
}
if(isRow2DataRow) {
numOfColsOfRow2 = schemaNCols(pSchema2);
......@@ -2460,7 +2460,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
// current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, 0x%"PRIx64, pQueryHandle,
pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId);
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
void taosNetTest(char *role, char *host, int port, int pkgLen);
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, int32_t pkgNum, char *pkgType);
#ifdef __cplusplus
}
......
......@@ -27,6 +27,10 @@
#include "syncMsg.h"
#define MAX_PKG_LEN (64 * 1000)
#define MAX_SPEED_PKG_LEN (1024 * 1024 * 1024)
#define MIN_SPEED_PKG_LEN 1024
#define MAX_SPEED_PKG_NUM 10000
#define MIN_SPEED_PKG_NUM 1
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
extern int32_t tsRpcMaxUdpSize;
......@@ -466,6 +470,7 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
sendpkgLen = pkgLen;
}
tsRpcForceTcp = 1;
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
if (ret < 0) {
printf("failed to test TCP port:%d\n", port);
......@@ -479,6 +484,7 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
sendpkgLen = pkgLen;
}
tsRpcForceTcp = 0;
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
if (ret < 0) {
printf("failed to test UDP port:%d\n", port);
......@@ -542,12 +548,110 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
}
}
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
static void taosNetTestFqdn(char *host) {
int code = 0;
uint64_t startTime = taosGetTimestampUs();
uint32_t ip = taosGetIpv4FromFqdn(host);
if (ip == 0xffffffff) {
uError("failed to get IP address from %s since %s", host, strerror(errno));
code = -1;
}
uint64_t endTime = taosGetTimestampUs();
uint64_t el = endTime - startTime;
printf("check convert fqdn spend, status: %d\tcost: %" PRIu64 " us\n", code, el);
return;
}
static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
int32_t pkgNum, char *pkgType) {
// record config
int32_t compressTmp = tsCompressMsgSize;
int32_t maxUdpSize = tsRpcMaxUdpSize;
int32_t forceTcp = tsRpcForceTcp;
if (0 == strcmp("tcp", pkgType)){
tsRpcForceTcp = 1;
tsRpcMaxUdpSize = 0; // force tcp
} else {
tsRpcForceTcp = 0;
tsRpcMaxUdpSize = INT_MAX;
}
tsCompressMsgSize = -1;
SRpcEpSet epSet;
SRpcMsg reqMsg;
SRpcMsg rspMsg;
void * pRpcConn;
char secretEncrypt[32] = {0};
char spi = 0;
pRpcConn = taosNetInitRpc(secretEncrypt, spi);
if (NULL == pRpcConn) {
uError("failed to init client rpc");
return;
}
printf("check net spend, host:%s port:%d pkgLen:%d pkgNum:%d pkgType:%s\n\n", host, port, pkgLen, pkgNum, pkgType);
int32_t totalSucc = 0;
uint64_t startT = taosGetTimestampUs();
for (int32_t i = 1; i <= pkgNum; i++) {
uint64_t startTime = taosGetTimestampUs();
memset(&epSet, 0, sizeof(SRpcEpSet));
epSet.inUse = 0;
epSet.numOfEps = 1;
epSet.port[0] = port;
strcpy(epSet.fqdn[0], host);
reqMsg.msgType = TSDB_MSG_TYPE_NETWORK_TEST;
reqMsg.pCont = rpcMallocCont(pkgLen);
reqMsg.contLen = pkgLen;
reqMsg.code = 0;
reqMsg.handle = NULL; // rpc handle returned to app
reqMsg.ahandle = NULL; // app handle set by client
strcpy(reqMsg.pCont, "nettest speed");
rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);
int code = 0;
if ((rspMsg.code != 0) || (rspMsg.msgType != TSDB_MSG_TYPE_NETWORK_TEST + 1)) {
uError("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
code = -1;
}else{
totalSucc ++;
}
rpcFreeCont(rspMsg.pCont);
uint64_t endTime = taosGetTimestampUs();
uint64_t el = endTime - startTime;
printf("progress:%5d/%d\tstatus:%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", i, pkgNum, code, el/1000.0, pkgLen/(el/1000000.0)/1024.0/1024.0);
}
int64_t endT = taosGetTimestampUs();
uint64_t elT = endT - startT;
printf("\ntotal succ:%5d/%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", totalSucc, pkgNum, elT/1000.0, pkgLen/(elT/1000000.0)/1024.0/1024.0*totalSucc);
rpcClose(pRpcConn);
// return config
tsCompressMsgSize = compressTmp;
tsRpcMaxUdpSize = maxUdpSize;
tsRpcForceTcp = forceTcp;
return;
}
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
int32_t pkgNum, char *pkgType) {
tscEmbedded = 1;
if (host == NULL) host = tsLocalFqdn;
if (port == 0) port = tsServerPort;
if (pkgLen <= 10) pkgLen = 1000;
if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN;
if (0 == strcmp("speed", role)){
if (pkgLen <= MIN_SPEED_PKG_LEN) pkgLen = MIN_SPEED_PKG_LEN;
if (pkgLen > MAX_SPEED_PKG_LEN) pkgLen = MAX_SPEED_PKG_LEN;
if (pkgNum <= MIN_SPEED_PKG_NUM) pkgNum = MIN_SPEED_PKG_NUM;
if (pkgNum > MAX_SPEED_PKG_NUM) pkgNum = MAX_SPEED_PKG_NUM;
}else{
if (pkgLen <= 10) pkgLen = 1000;
if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN;
}
if (0 == strcmp("client", role)) {
taosNetTestClient(host, port, pkgLen);
......@@ -560,6 +664,12 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
taosNetCheckSync(host, port);
} else if (0 == strcmp("startup", role)) {
taosNetTestStartup(host, port);
} else if (0 == strcmp("speed", role)) {
tscEmbedded = 0;
char type[10] = {0};
taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType));
}else if (0 == strcmp("fqdn", role)) {
taosNetTestFqdn(host);
} else {
taosNetTestStartup(host, port);
}
......
#!/bin/bash
taos -n fqdn
#!/bin/bash
for N in -1 0 1 10000 10001
do
for l in 1023 1024 1073741824 1073741825
do
for S in udp tcp
do
taos -n speed -h BCC-2 -P 6030 -N $N -l $l -S $S 2>&1 | tee -a result.txt
done
done
done
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册