提交 f979b1f9 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/query

...@@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS) ...@@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver) #INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .) #INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED) IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.14-dist.jar DESTINATION connector/jdbc) INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.15-dist.jar DESTINATION connector/jdbc)
ENDIF () ENDIF ()
ELSEIF (TD_DARWIN) ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
......
...@@ -96,7 +96,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa ...@@ -96,7 +96,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
return; return;
} }
taosNotePrintTsc(sqlstr); nPrintTsc(sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
......
...@@ -3282,7 +3282,12 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC ...@@ -3282,7 +3282,12 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
((pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0); ((pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0);
if (pColFilter->filterstr) { if (pColFilter->filterstr) {
if (pExpr->nSQLOptr != TK_EQ && pExpr->nSQLOptr != TK_NE && pExpr->nSQLOptr != TK_LIKE) { if (pExpr->nSQLOptr != TK_EQ
&& pExpr->nSQLOptr != TK_NE
&& pExpr->nSQLOptr != TK_ISNULL
&& pExpr->nSQLOptr != TK_NOTNULL
&& pExpr->nSQLOptr != TK_LIKE
) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
} else { } else {
......
...@@ -344,7 +344,7 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES ...@@ -344,7 +344,7 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES
return NULL; return NULL;
} }
taosNotePrintTsc(sqlstr); nPrintTsc(sqlstr);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tnote.h"
#include "tsystem.h" #include "tsystem.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
...@@ -41,7 +42,6 @@ int tscRefId = -1; ...@@ -41,7 +42,6 @@ int tscRefId = -1;
int tscNumOfThreads; int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet); //void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
...@@ -78,7 +78,6 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon ...@@ -78,7 +78,6 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon
return 0; return 0;
} }
void taos_init_imp(void) { void taos_init_imp(void) {
char temp[128] = {0}; char temp[128] = {0};
...@@ -104,6 +103,7 @@ void taos_init_imp(void) { ...@@ -104,6 +103,7 @@ void taos_init_imp(void) {
taosReadGlobalCfg(); taosReadGlobalCfg();
taosCheckGlobalCfg(); taosCheckGlobalCfg();
taosInitNotes();
rpcInit(); rpcInit();
tscDebug("starting to initialize TAOS client ..."); tscDebug("starting to initialize TAOS client ...");
...@@ -111,11 +111,6 @@ void taos_init_imp(void) { ...@@ -111,11 +111,6 @@ void taos_init_imp(void) {
} }
taosSetCoreDump(); taosSetCoreDump();
if (tsTscEnableRecordSql != 0) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
}
tscInitMsgsFp(); tscInitMsgsFp();
int queueSize = tsMaxConnections*2; int queueSize = tsMaxConnections*2;
......
...@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED) ...@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME} ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.14-dist.jar ${LIBRARY_OUTPUT_PATH} COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.15-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver") COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME}) ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.14</version> <version>2.0.15</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>JDBCDriver</name> <name>JDBCDriver</name>
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
</developer> </developer>
</developers> </developers>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.14</version> <version>2.0.15</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>JDBCDriver</name> <name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url> <url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
......
...@@ -81,7 +81,7 @@ public class TSDBStatement implements Statement { ...@@ -81,7 +81,7 @@ public class TSDBStatement implements Statement {
} }
if (!this.connector.isUpdateQuery(pSql)) { if (!this.connector.isUpdateQuery(pSql)) {
TSDBResultSet res = new TSDBResultSet(this.connector, resultSetPointer); TSDBResultSet res = new TSDBResultSet(this.connector, resultSetPointer);
res.setBatchFetch(this.connection.getBatchFetch()); res.setBatchFetch(this.connection.getBatchFetch());
return res; return res;
} else { } else {
...@@ -125,7 +125,8 @@ public class TSDBStatement implements Statement { ...@@ -125,7 +125,8 @@ public class TSDBStatement implements Statement {
} }
public int getMaxFieldSize() throws SQLException { public int getMaxFieldSize() throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG); return 0;
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
public void setMaxFieldSize(int max) throws SQLException { public void setMaxFieldSize(int max) throws SQLException {
...@@ -218,7 +219,8 @@ public class TSDBStatement implements Statement { ...@@ -218,7 +219,8 @@ public class TSDBStatement implements Statement {
} }
public int getFetchDirection() throws SQLException { public int getFetchDirection() throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG); return ResultSet.FETCH_FORWARD;
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
/* /*
......
...@@ -237,7 +237,7 @@ PRASE_EPS_OVER: ...@@ -237,7 +237,7 @@ PRASE_EPS_OVER:
dnodeUpdateEp(dnodeGetDnodeId(), tsLocalEp, tsLocalFqdn, &tsServerPort); dnodeUpdateEp(dnodeGetDnodeId(), tsLocalEp, tsLocalFqdn, &tsServerPort);
#else #else
if (dnodeCheckEpChanged(dnodeGetDnodeId(), tsLocalEp)) { if (dnodeCheckEpChanged(dnodeGetDnodeId(), tsLocalEp)) {
dError("dnode:%d, localEp is changed to %s in dnodeEps.json and need reconfigured", dnodeGetDnodeId(), tsLocalEp); dError("dnode:%d, localEp is different from %s in dnodeEps.json and need reconfigured", dnodeGetDnodeId(), tsLocalEp);
return -1; return -1;
} }
#endif #endif
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
#include "tnote.h"
#include "tconfig.h" #include "tconfig.h"
#include "tfile.h" #include "tfile.h"
#include "twal.h" #include "twal.h"
...@@ -98,6 +99,7 @@ int32_t dnodeInitSystem() { ...@@ -98,6 +99,7 @@ int32_t dnodeInitSystem() {
taosInitGlobalCfg(); taosInitGlobalCfg();
taosReadGlobalLogCfg(); taosReadGlobalLogCfg();
taosSetCoreDump(); taosSetCoreDump();
taosInitNotes();
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
if (dnodeCreateDir(tsLogDir) < 0) { if (dnodeCreateDir(tsLogDir) < 0) {
......
...@@ -46,7 +46,7 @@ static struct argp_option options[] = { ...@@ -46,7 +46,7 @@ static struct argp_option options[] = {
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|server|rpc|startup."}, {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."}, {"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{0}}; {0}};
......
...@@ -45,6 +45,10 @@ void printHelp() { ...@@ -45,6 +45,10 @@ void printHelp() {
printf("%s%s%s\n", indent, indent, "Database to use when connecting to the server."); printf("%s%s%s\n", indent, indent, "Database to use when connecting to the server.");
printf("%s%s\n", indent, "-t"); printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local."); printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local.");
printf("%s%s\n", indent, "-n");
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup.");
printf("%s%s\n", indent, "-l");
printf("%s%s%s\n", indent, indent, "Packet length used for net test, default is 1000 bytes.");
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
...@@ -137,6 +141,24 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { ...@@ -137,6 +141,24 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
// For time zone
else if (strcmp(argv[i], "-n") == 0) {
if (i < argc - 1) {
arguments->netTestRole = argv[++i];
} else {
fprintf(stderr, "option -n requires an argument\n");
exit(EXIT_FAILURE);
}
}
// For time zone
else if (strcmp(argv[i], "-l") == 0) {
if (i < argc - 1) {
arguments->pktLen = atoi(argv[++i]);
} else {
fprintf(stderr, "option -l requires an argument\n");
exit(EXIT_FAILURE);
}
}
// For temperory command TODO // For temperory command TODO
else if (strcmp(argv[i], "--help") == 0) { else if (strcmp(argv[i], "--help") == 0) {
printHelp(); printHelp();
......
...@@ -396,14 +396,15 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) { ...@@ -396,14 +396,15 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
atomic_add_fetch_32(&pStable->numOfTables, 1); atomic_add_fetch_32(&pStable->numOfTables, 1);
if (pStable->vgHash == NULL) { if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
mDebug("table:%s, create hash:%p", pStable->info.tableId, pStable->vgHash);
} }
if (pStable->vgHash != NULL) { if (pStable->vgHash != NULL) {
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) { if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId)); taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is put into stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, mDebug("table:%s, vgId:%d is put into stable hash:%p, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash)); pStable->vgHash, taosHashGetSize(pStable->vgHash));
} }
} }
} }
...@@ -416,13 +417,14 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable) ...@@ -416,13 +417,14 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable)
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId); SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)); taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, mDebug("table:%s, vgId:%d is remove from stable hash:%p sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash)); pStable->vgHash, taosHashGetSize(pStable->vgHash));
} }
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
} }
static void mnodeDestroySuperTable(SSTableObj *pStable) { static void mnodeDestroySuperTable(SSTableObj *pStable) {
mDebug("table:%s, is destroyed, stable hash:%p", pStable->info.tableId, pStable->vgHash);
if (pStable->vgHash != NULL) { if (pStable->vgHash != NULL) {
taosHashCleanup(pStable->vgHash); taosHashCleanup(pStable->vgHash);
pStable->vgHash = NULL; pStable->vgHash = NULL;
...@@ -464,6 +466,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) { ...@@ -464,6 +466,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
SSTableObj *pNew = pRow->pObj; SSTableObj *pNew = pRow->pObj;
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId); SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
if (pTable != NULL && pTable != pNew) { if (pTable != NULL && pTable != pNew) {
mDebug("table:%s, will be updated, hash:%p sizeOfVgList:%d, new hash:%p sizeOfVgList:%d", pTable->info.tableId,
pTable->vgHash, taosHashGetSize(pTable->vgHash), pNew->vgHash, taosHashGetSize(pNew->vgHash));
void *oldTableId = pTable->info.tableId; void *oldTableId = pTable->info.tableId;
void *oldSchema = pTable->schema; void *oldSchema = pTable->schema;
void *oldVgHash = pTable->vgHash; void *oldVgHash = pTable->vgHash;
...@@ -479,6 +484,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) { ...@@ -479,6 +484,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
free(pNew); free(pNew);
free(oldTableId); free(oldTableId);
free(oldSchema); free(oldSchema);
mDebug("table:%s, update finished, hash:%p sizeOfVgList:%d", pTable->info.tableId, pTable->vgHash,
taosHashGetSize(pTable->vgHash));
} }
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
...@@ -783,8 +791,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { ...@@ -783,8 +791,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable->type == TSDB_SUPER_TABLE) { if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
SSTableObj *pSTable = (SSTableObj *)pMsg->pTable; SSTableObj *pSTable = (SSTableObj *)pMsg->pTable;
mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", pMsg,
pMsg, pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, (int32_t)taosHashGetSize(pSTable->vgHash)); pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, taosHashGetSize(pSTable->vgHash));
return mnodeProcessDropSuperTableMsg(pMsg); return mnodeProcessDropSuperTableMsg(pMsg);
} else { } else {
SCTableObj *pCTable = (SCTableObj *)pMsg->pTable; SCTableObj *pCTable = (SCTableObj *)pMsg->pTable;
...@@ -925,7 +933,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -925,7 +933,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable; SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) { mInfo("msg:%p, app:%p stable:%s will be dropped, hash:%p sizeOfVgList:%d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
int32_t *pVgId = taosHashIterate(pStable->vgHash, NULL); int32_t *pVgId = taosHashIterate(pStable->vgHash, NULL);
while (pVgId) { while (pVgId) {
SVgObj *pVgroup = mnodeGetVgroup(*pVgId); SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
...@@ -938,8 +949,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -938,8 +949,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
pDrop->uid = htobe64(pStable->uid); pDrop->uid = htobe64(pStable->uid);
mnodeExtractTableName(pStable->info.tableId, pDrop->tableId); mnodeExtractTableName(pStable->info.tableId, pDrop->tableId);
mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d", pMsg, pMsg->rpcMsg.ahandle, mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d, hash:%p sizeOfVgList:%d", pMsg,
pStable->info.tableId, pVgroup->vgId); pMsg->rpcMsg.ahandle, pStable->info.tableId, pVgroup->vgId, pStable->vgHash,
taosHashGetSize(pStable->vgHash));
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
dnodeSendMsgToDnode(&epSet, &rpcMsg); dnodeSendMsgToDnode(&epSet, &rpcMsg);
...@@ -1482,8 +1494,8 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { ...@@ -1482,8 +1494,8 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pMeta; pMsg->rpcRsp.rsp = pMeta;
mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved", pMsg, pMsg->rpcMsg.ahandle, mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved, sizeOfVgList:%d numOfTables:%d", pMsg,
pTable->info.tableId, pTable->uid); pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->uid, taosHashGetSize(pTable->vgHash), pTable->numOfTables);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1512,7 +1524,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1512,7 +1524,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
char *msg = (char *)pRsp + sizeof(SSTableVgroupRspMsg); char *msg = (char *)pRsp + sizeof(SSTableVgroupRspMsg);
for (int32_t i = 0; i < numOfTable; ++i) { for (int32_t i = 0; i < numOfTable; ++i) {
char * stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i; char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
SSTableObj *pTable = mnodeGetSuperTable(stableName); SSTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable == NULL) { if (pTable == NULL) {
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName); mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
...@@ -1533,6 +1545,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1533,6 +1545,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
msg += sizeof(SVgroupsMsg); msg += sizeof(SVgroupsMsg);
} else { } else {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL); int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
int32_t vgSize = 0; int32_t vgSize = 0;
......
...@@ -79,7 +79,7 @@ bool httpInitContexts() { ...@@ -79,7 +79,7 @@ bool httpInitContexts() {
void httpCleanupContexts() { void httpCleanupContexts() {
if (tsHttpServer.contextCache != NULL) { if (tsHttpServer.contextCache != NULL) {
SCacheObj *cache = tsHttpServer.contextCache; SCacheObj *cache = tsHttpServer.contextCache;
httpInfo("context cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable)); httpInfo("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.contextCache); taosCacheCleanup(tsHttpServer.contextCache);
tsHttpServer.contextCache = NULL; tsHttpServer.contextCache = NULL;
} }
......
...@@ -107,7 +107,7 @@ static void httpDestroySession(void *data) { ...@@ -107,7 +107,7 @@ static void httpDestroySession(void *data) {
void httpCleanUpSessions() { void httpCleanUpSessions() {
if (tsHttpServer.sessionCache != NULL) { if (tsHttpServer.sessionCache != NULL) {
SCacheObj *cache = tsHttpServer.sessionCache; SCacheObj *cache = tsHttpServer.sessionCache;
httpInfo("session cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable)); httpInfo("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.sessionCache); taosCacheCleanup(tsHttpServer.sessionCache);
tsHttpServer.sessionCache = NULL; tsHttpServer.sessionCache = NULL;
} }
......
...@@ -181,7 +181,7 @@ void httpProcessMultiSql(HttpContext *pContext) { ...@@ -181,7 +181,7 @@ void httpProcessMultiSql(HttpContext *pContext) {
char *sql = httpGetCmdsString(pContext, cmd->sql); char *sql = httpGetCmdsString(pContext, cmd->sql);
httpTraceL("context:%p, fd:%d, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, pContext->user, httpTraceL("context:%p, fd:%d, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, pContext->user,
multiCmds->pos, sql); multiCmds->pos, sql);
taosNotePrintHttp(sql); nPrintHttp(sql);
taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext); taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext);
} }
...@@ -329,7 +329,7 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) { ...@@ -329,7 +329,7 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) {
} }
httpTraceL("context:%p, fd:%d, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->user, sql); httpTraceL("context:%p, fd:%d, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->user, sql);
taosNotePrintHttp(sql); nPrintHttp(sql);
taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext); taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext);
} }
......
...@@ -37,7 +37,6 @@ void opInitHandle(HttpServer* pServer) {} ...@@ -37,7 +37,6 @@ void opInitHandle(HttpServer* pServer) {}
#endif #endif
HttpServer tsHttpServer; HttpServer tsHttpServer;
void taosInitNote(int32_t numOfNoteLines, int32_t maxNotes, char* lable);
int32_t httpInitSystem() { int32_t httpInitSystem() {
strcpy(tsHttpServer.label, "rest"); strcpy(tsHttpServer.label, "rest");
...@@ -48,9 +47,6 @@ int32_t httpInitSystem() { ...@@ -48,9 +47,6 @@ int32_t httpInitSystem() {
pthread_mutex_init(&tsHttpServer.serverMutex, NULL); pthread_mutex_init(&tsHttpServer.serverMutex, NULL);
if (tsHttpEnableRecordSql != 0) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"http_note");
}
restInitHandle(&tsHttpServer); restInitHandle(&tsHttpServer);
adminInitHandle(&tsHttpServer); adminInitHandle(&tsHttpServer);
gcInitHandle(&tsHttpServer); gcInitHandle(&tsHttpServer);
......
...@@ -82,7 +82,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp ...@@ -82,7 +82,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
* @param pHashObj * @param pHashObj
* @return * @return
*/ */
size_t taosHashGetSize(const SHashObj *pHashObj); int32_t taosHashGetSize(const SHashObj *pHashObj);
/** /**
* put element into hash table, if the element with the same key exists, update it * put element into hash table, if the element with the same key exists, update it
......
...@@ -20,41 +20,42 @@ ...@@ -20,41 +20,42 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "tutil.h"
#include "tglobal.h"
#define MAX_NOTE_LINE_SIZE 66000 #define MAX_NOTE_LINE_SIZE 66000
#define NOTE_FILE_NAME_LEN 300 #define NOTE_FILE_NAME_LEN 300
typedef struct _taosNoteInfo { typedef struct {
int taosNoteFileNum ; int32_t fileNum;
int taosNoteMaxLines; int32_t maxLines;
int taosNoteLines; int32_t lines;
char taosNoteName[NOTE_FILE_NAME_LEN]; int32_t flag;
int taosNoteFlag; int32_t fd;
int taosNoteFd; int32_t openInProgress;
int taosNoteOpenInProgress; char name[NOTE_FILE_NAME_LEN];
pthread_mutex_t taosNoteMutex; pthread_mutex_t mutex;
}taosNoteInfo; } SNoteObj;
void taosNotePrint(taosNoteInfo * pNote, const char * const format, ...); extern SNoteObj tsHttpNote;
extern SNoteObj tsTscNote;
extern taosNoteInfo m_HttpNote; extern SNoteObj tsInfoNote;
extern taosNoteInfo m_TscNote;
void taosInitNotes();
extern int tsHttpEnableRecordSql; void taosNotePrint(SNoteObj* pNote, const char* const format, ...);
extern int tsTscEnableRecordSql; void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len);
#define taosNotePrintHttp(...) \ #define nPrintHttp(...) \
if (tsHttpEnableRecordSql) { \ if (tsHttpEnableRecordSql) { \
taosNotePrint(&m_HttpNote, __VA_ARGS__); \ taosNotePrint(&tsHttpNote, __VA_ARGS__); \
}
#define nPrintTsc(...) \
if (tsTscEnableRecordSql) { \
taosNotePrint(&tsTscNote, __VA_ARGS__); \
}
#define nInfo(buffer, len) \
if (tscEmbedded == 1) { \
taosNotePrintBuffer(&tsInfoNote, buffer, len); \
} }
#define taosNotePrintTsc(...) \
if (tsTscEnableRecordSql) { \
taosNotePrint(&m_TscNote, __VA_ARGS__); \
}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -189,7 +189,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp ...@@ -189,7 +189,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
return pHashObj; return pHashObj;
} }
size_t taosHashGetSize(const SHashObj *pHashObj) { return (pHashObj == NULL) ? 0 : pHashObj->size; } int32_t taosHashGetSize(const SHashObj *pHashObj) { return (int32_t)((pHashObj == NULL) ? 0 : pHashObj->size); }
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "tulog.h" #include "tulog.h"
#include "tlog.h" #include "tlog.h"
#include "tnote.h"
#include "tutil.h" #include "tutil.h"
#define MAX_LOGLINE_SIZE (1000) #define MAX_LOGLINE_SIZE (1000)
...@@ -287,7 +288,6 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { ...@@ -287,7 +288,6 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
tsLogObj.fileNum = maxFileNum; tsLogObj.fileNum = maxFileNum;
taosGetLogFileName(fn); taosGetLogFileName(fn);
if (strlen(fn) < LOG_FILE_NAME_LEN + 50 - 2) { if (strlen(fn) < LOG_FILE_NAME_LEN + 50 - 2) {
strcpy(name, fn); strcpy(name, fn);
strcat(name, ".0"); strcat(name, ".0");
...@@ -401,6 +401,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { ...@@ -401,6 +401,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
} }
if (dflag & DEBUG_SCREEN) taosWrite(1, buffer, (uint32_t)len); if (dflag & DEBUG_SCREEN) taosWrite(1, buffer, (uint32_t)len);
if (dflag == 255) nInfo(buffer, len);
} }
void taosDumpData(unsigned char *msg, int32_t len) { void taosDumpData(unsigned char *msg, int32_t len) {
......
...@@ -43,12 +43,13 @@ static void *taosNetBindUdpPort(void *sarg) { ...@@ -43,12 +43,13 @@ static void *taosNetBindUdpPort(void *sarg) {
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
int32_t iDataNum; int32_t iDataNum;
socklen_t sin_size; socklen_t sin_size;
int32_t bufSize = 1024000;
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
uError("failed to create udp socket since %s", strerror(errno)); uError("failed to create UDP socket since %s", strerror(errno));
return NULL; return NULL;
} }
...@@ -58,11 +59,23 @@ static void *taosNetBindUdpPort(void *sarg) { ...@@ -58,11 +59,23 @@ static void *taosNetBindUdpPort(void *sarg) {
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
uError("failed to bind udp port:%d since %s", port, strerror(errno)); uError("failed to bind UDP port:%d since %s", port, strerror(errno));
return NULL; return NULL;
} }
uInfo("udp server at port:%d is listening", port); if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(serverSocket);
return NULL;
}
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(serverSocket);
return NULL;
}
uInfo("UDP server at port:%d is listening", port);
while (1) { while (1) {
memset(buffer, 0, BUFFER_SIZE); memset(buffer, 0, BUFFER_SIZE);
...@@ -74,10 +87,13 @@ static void *taosNetBindUdpPort(void *sarg) { ...@@ -74,10 +87,13 @@ static void *taosNetBindUdpPort(void *sarg) {
continue; continue;
} }
uInfo("UDP: recv:%d bytes from %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
if (iDataNum > 0) { if (iDataNum > 0) {
uInfo("UDP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port); iDataNum = taosSendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
} }
uInfo("UDP: send:%d bytes to %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
} }
taosCloseSocket(serverSocket); taosCloseSocket(serverSocket);
...@@ -94,10 +110,9 @@ static void *taosNetBindTcpPort(void *sarg) { ...@@ -94,10 +110,9 @@ static void *taosNetBindTcpPort(void *sarg) {
int32_t addr_len = sizeof(clientAddr); int32_t addr_len = sizeof(clientAddr);
SOCKET client; SOCKET client;
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
int32_t iDataNum = 0;
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
uError("failed to create tcp socket since %s", strerror(errno)); uError("failed to create TCP socket since %s", strerror(errno));
return NULL; return NULL;
} }
...@@ -106,130 +121,103 @@ static void *taosNetBindTcpPort(void *sarg) { ...@@ -106,130 +121,103 @@ static void *taosNetBindTcpPort(void *sarg) {
server_addr.sin_port = htons(port); server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int32_t reuse = 1;
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
}
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
uError("failed to bind tcp port:%d since %s", port, strerror(errno)); uError("failed to bind TCP port:%d since %s", port, strerror(errno));
return NULL; return NULL;
} }
if (listen(serverSocket, 5) < 0) {
uError("failed to listen tcp port:%d since %s", port, strerror(errno)); if (taosKeepTcpAlive(serverSocket) < 0) {
uError("failed to set tcp server keep-alive option since %s", strerror(errno));
taosCloseSocket(serverSocket);
return NULL; return NULL;
} }
uInfo("tcp server at port:%d is listening", port); if (listen(serverSocket, 10) < 0) {
uError("failed to listen TCP port:%d since %s", port, strerror(errno));
return NULL;
}
uInfo("TCP server at port:%d is listening", port);
while (1) { while (1) {
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
if (client < 0) { if (client < 0) {
uDebug("failed to accept from tcp port:%d since %s", port, strerror(errno)); uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno));
continue; continue;
} }
iDataNum = 0; int32_t ret = taosReadMsg(client, buffer, pinfo->pktLen);
memset(buffer, 0, BUFFER_SIZE); if (ret < 0 || ret != pinfo->pktLen) {
int32_t nleft, nread; uError("TCP: failed to read %d bytes at port:%d since %s", pinfo->pktLen, port, strerror(errno));
char * ptr = buffer; taosCloseSocket(serverSocket);
nleft = pinfo->pktLen; return NULL;
while (nleft > 0) {
nread = recv(client, ptr, BUFFER_SIZE, 0);
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR) {
continue;
} else {
uError("failed to perform recv func at %d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
}
} else {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
} }
if (iDataNum > 0) { uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
uInfo("TCP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
send(client, buffer, iDataNum, 0); ret = taosWriteMsg(client, buffer, pinfo->pktLen);
if (ret < 0) {
uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, port, strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
} }
uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
} }
taosCloseSocket(serverSocket); taosCloseSocket(serverSocket);
return NULL; return NULL;
} }
static int32_t taosNetCheckTcpPort(STestInfo *info) { static int32_t taosNetCheckTcpPort(STestInfo *info) {
SOCKET clientSocket; SOCKET clientSocket;
char sendbuf[BUFFER_SIZE]; char buffer[BUFFER_SIZE] = {0};
char recvbuf[BUFFER_SIZE];
int32_t iDataNum = 0;
struct sockaddr_in serverAddr;
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
uError("failed to create tcp client socket since %s", strerror(errno)); uError("failed to create TCP client socket since %s", strerror(errno));
return -1; return -1;
} }
// set send and recv overtime int32_t reuse = 1;
struct timeval timeout; if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
timeout.tv_sec = 2; // s uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
timeout.tv_usec = 0; // us taosCloseSocket(clientSocket);
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { return -1;
uError("failed to setsockopt send timer since %s", strerror(errno));
}
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt recv timer since %s", strerror(errno));
} }
struct sockaddr_in serverAddr;
memset((char *)&serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET; serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port); serverAddr.sin_port = (uint16_t)htons((uint16_t)info->port);
serverAddr.sin_addr.s_addr = info->hostIp; serverAddr.sin_addr.s_addr = info->hostIp;
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
uError("failed to connect port:%d since %s", info->port, strerror(errno)); uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
return -1; return -1;
} }
memset(sendbuf, 0, BUFFER_SIZE); taosKeepTcpAlive(clientSocket);
memset(recvbuf, 0, BUFFER_SIZE);
struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4);
sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
send(clientSocket, sendbuf, info->pktLen, 0);
memset(recvbuf, 0, BUFFER_SIZE); sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port);
int32_t nleft, nread; sprintf(buffer + info->pktLen - 16, "1122334455667788");
char * ptr = recvbuf;
nleft = info->pktLen;
while (nleft > 0) { int32_t ret = taosWriteMsg(clientSocket, buffer, info->pktLen);
nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);; if (ret < 0) {
uError("TCP: failed to write msg to %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
if (nread == 0) { return -1;
break;
} else if (nread < 0) {
if (errno == EINTR) {
continue;
} else {
uError("faild to recv pkg from TCP port:%d since %s", info->port, strerror(errno));
taosCloseSocket(clientSocket);
return -1;
}
} else {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
} }
if (iDataNum < info->pktLen) { ret = taosReadMsg(clientSocket, buffer, info->pktLen);
uError("TCP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port); if (ret < 0) {
uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
return -1; return -1;
} }
...@@ -239,9 +227,9 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) { ...@@ -239,9 +227,9 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) {
static int32_t taosNetCheckUdpPort(STestInfo *info) { static int32_t taosNetCheckUdpPort(STestInfo *info) {
SOCKET clientSocket; SOCKET clientSocket;
char sendbuf[BUFFER_SIZE]; char buffer[BUFFER_SIZE] = {0};
char recvbuf[BUFFER_SIZE];
int32_t iDataNum = 0; int32_t iDataNum = 0;
int32_t bufSize = 1024000;
struct sockaddr_in serverAddr; struct sockaddr_in serverAddr;
...@@ -250,41 +238,39 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) { ...@@ -250,41 +238,39 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) {
return -1; return -1;
} }
// set overtime if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
struct timeval timeout; uError("failed to set the send buffer size for UDP socket\n");
timeout.tv_sec = 2; // s return -1;
timeout.tv_usec = 0; // us
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt send timer since %s", strerror(errno));
} }
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt recv timer since %s", strerror(errno)); if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
return -1;
} }
serverAddr.sin_family = AF_INET; serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port); serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = info->hostIp; serverAddr.sin_addr.s_addr = info->hostIp;
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
struct in_addr ipStr; struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4); memcpy(&ipStr, &info->hostIp, 4);
sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port); sprintf(buffer, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); sprintf(buffer + info->pktLen - 16, "1122334455667788");
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
int32_t code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size); iDataNum = taosSendto(clientSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
if (code < 0) { if (iDataNum < 0 || iDataNum != info->pktLen) {
uError("failed to perform sendto func since %s", strerror(errno)); uError("UDP: failed to perform sendto func since %s", strerror(errno));
return -1; return -1;
} }
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); memset(buffer, 0, BUFFER_SIZE);
sin_size = sizeof(*(struct sockaddr *)&serverAddr);
iDataNum = recvfrom(clientSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
if (iDataNum < info->pktLen) { if (iDataNum < 0 || iDataNum != info->pktLen) {
uError("UDP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port); uError("UDP: received ack:%d bytes(expect:%d) from port:%d since %s", iDataNum, info->pktLen, info->port, strerror(errno));
return -1; return -1;
} }
...@@ -304,19 +290,18 @@ static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort ...@@ -304,19 +290,18 @@ static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort
info.port = port; info.port = port;
ret = taosNetCheckTcpPort(&info); ret = taosNetCheckTcpPort(&info);
if (ret != 0) { if (ret != 0) {
uError("failed to test tcp port:%d", port); uError("failed to test TCP port:%d", port);
} else { } else {
uInfo("successed to test tcp port:%d", port); uInfo("successed to test TCP port:%d", port);
} }
ret = taosNetCheckUdpPort(&info); ret = taosNetCheckUdpPort(&info);
if (ret != 0) { if (ret != 0) {
uError("failed to test udp port:%d", port); uError("failed to test UDP port:%d", port);
} else { } else {
uInfo("successed to test udp port:%d", port); uInfo("successed to test UDP port:%d", port);
} }
} }
return;
} }
void *taosNetInitRpc(char *secretEncrypt, char spi) { void *taosNetInitRpc(char *secretEncrypt, char spi) {
...@@ -440,9 +425,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { ...@@ -440,9 +425,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL); int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
if (ret < 0) { if (ret < 0) {
uError("failed to test tcp port:%d", port); uError("failed to test TCP port:%d", port);
} else { } else {
uInfo("successed to test tcp port:%d", port); uInfo("successed to test TCP port:%d", port);
} }
if (pkgLen >= tsRpcMaxUdpSize) { if (pkgLen >= tsRpcMaxUdpSize) {
...@@ -453,9 +438,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { ...@@ -453,9 +438,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL); ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
if (ret < 0) { if (ret < 0) {
uError("failed to test udp port:%d", port); uError("failed to test UDP port:%d", port);
} else { } else {
uInfo("successed to test udp port:%d", port); uInfo("successed to test UDP port:%d", port);
} }
} }
} }
...@@ -492,14 +477,15 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) { ...@@ -492,14 +477,15 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
tcpInfo->pktLen = pkgLen; tcpInfo->pktLen = pkgLen;
if (pthread_create(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) { if (pthread_create(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) {
uInfo("failed to create tcp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port); uInfo("failed to create TCP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
exit(-1); exit(-1);
} }
STestInfo *udpInfo = uinfos + i; STestInfo *udpInfo = uinfos + i;
udpInfo->port = (uint16_t)(port + i); udpInfo->port = port + i;
tcpInfo->pktLen = pkgLen;
if (pthread_create(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) { if (pthread_create(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) {
uInfo("failed to create udp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port); uInfo("failed to create UDP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
exit(-1); exit(-1);
} }
} }
......
...@@ -13,277 +13,260 @@ ...@@ -13,277 +13,260 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tutil.h"
#include "tglobal.h"
#include "tnote.h" #include "tnote.h"
taosNoteInfo m_HttpNote; SNoteObj tsHttpNote;
taosNoteInfo m_TscNote; SNoteObj tsTscNote;
SNoteObj tsInfoNote;
int taosOpenNoteWithMaxLines(char *fn, int maxLines, int maxNoteNum, taosNoteInfo * pNote); static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxNoteNum, SNoteObj *pNote);
static void taosCloseNoteByFd(int32_t oldFd, SNoteObj *pNote);
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable) static void taosInitNote(int32_t numOfLines, int32_t maxNotes, SNoteObj *pNote, char *name) {
{ memset(pNote, 0, sizeof(SNoteObj));
taosNoteInfo * pNote = NULL; pNote->fileNum = 1;
char temp[128] = { 0 }; pNote->fd = -1;
if (strcasecmp(lable, "http_note") == 0) { if (taosOpenNoteWithMaxLines(name, numOfLines, maxNotes, pNote) < 0) {
pNote = &m_HttpNote; fprintf(stderr, "failed to init note file\n");
sprintf(temp, "%s/httpnote", tsLogDir); }
} else if (strcasecmp(lable, "tsc_note") == 0) {
pNote = &m_TscNote; taosNotePrint(pNote, "==================================================");
sprintf(temp, "%s/tscnote-%d", tsLogDir, getpid()); taosNotePrint(pNote, "=================== new note ===================");
} else { taosNotePrint(pNote, "==================================================");
return; }
}
memset(pNote, 0, sizeof(taosNoteInfo)); void taosInitNotes() {
pNote->taosNoteFileNum = 1; char name[TSDB_FILENAME_LEN * 2] = {0};
//pNote->taosNoteMaxLines = 0;
//pNote->taosNoteLines = 0;
//pNote->taosNoteFlag = 0;
pNote->taosNoteFd = -1;
//pNote->taosNoteOpenInProgress = 0;
if (taosOpenNoteWithMaxLines(temp, numOfNoteLines, maxNotes, pNote) < 0) if (tsTscEnableRecordSql) {
fprintf(stderr, "failed to init note file\n"); snprintf(name, TSDB_FILENAME_LEN * 2, "%s/tscsql-%d", tsLogDir, taosGetPId());
taosInitNote(tsNumOfLogLines, 1, &tsTscNote, name);
}
taosNotePrint(pNote, "=================================================="); if (tsHttpEnableRecordSql) {
taosNotePrint(pNote, "=================== new note ==================="); snprintf(name, TSDB_FILENAME_LEN * 2, "%s/httpsql", tsLogDir);
taosNotePrint(pNote, "=================================================="); taosInitNote(tsNumOfLogLines, 1, &tsHttpNote, name);
}
if (tscEmbedded == 1) {
snprintf(name, TSDB_FILENAME_LEN * 2, "%s/taosinfo", tsLogDir);
taosInitNote(tsNumOfLogLines, 1, &tsInfoNote, name);
}
} }
void taosCloseNoteByFd(int oldFd, taosNoteInfo * pNote); static bool taosLockNote(int32_t fd, SNoteObj *pNote) {
bool taosLockNote(int fd, taosNoteInfo * pNote) if (fd < 0) return false;
{
if (fd < 0) return false;
if (pNote->taosNoteFileNum > 1) { if (pNote->fileNum > 1) {
int ret = (int)(flock(fd, LOCK_EX | LOCK_NB)); int32_t ret = (int32_t)(flock(fd, LOCK_EX | LOCK_NB));
if (ret == 0) { if (ret == 0) {
return true; return true;
}
} }
}
return false; return false;
} }
void taosUnLockNote(int fd, taosNoteInfo * pNote) static void taosUnLockNote(int32_t fd, SNoteObj *pNote) {
{ if (fd < 0) return;
if (fd < 0) return;
if (pNote->taosNoteFileNum > 1) { if (pNote->fileNum > 1) {
flock(fd, LOCK_UN | LOCK_NB); flock(fd, LOCK_UN | LOCK_NB);
} }
} }
void *taosThreadToOpenNewNote(void *param) static void *taosThreadToOpenNewNote(void *param) {
{ char name[NOTE_FILE_NAME_LEN * 2];
char name[NOTE_FILE_NAME_LEN * 2]; SNoteObj *pNote = (SNoteObj *)param;
taosNoteInfo * pNote = (taosNoteInfo *)param;
pNote->taosNoteFlag ^= 1; pNote->flag ^= 1;
pNote->taosNoteLines = 0; pNote->lines = 0;
sprintf(name, "%s.%d", pNote->taosNoteName, pNote->taosNoteFlag); sprintf(name, "%s.%d", pNote->name, pNote->flag);
umask(0); umask(0);
int fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); int32_t fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (fd < 0) { if (fd < 0) {
return NULL; return NULL;
} }
taosLockNote(fd, pNote); taosLockNote(fd, pNote);
(void)lseek(fd, 0, SEEK_SET); (void)lseek(fd, 0, SEEK_SET);
int oldFd = pNote->taosNoteFd; int32_t oldFd = pNote->fd;
pNote->taosNoteFd = fd; pNote->fd = fd;
pNote->taosNoteLines = 0; pNote->lines = 0;
pNote->taosNoteOpenInProgress = 0; pNote->openInProgress = 0;
taosNotePrint(pNote, "=============== new note is opened ============="); taosNotePrint(pNote, "=============== new note is opened =============");
taosCloseNoteByFd(oldFd, pNote); taosCloseNoteByFd(oldFd, pNote);
return NULL; return NULL;
} }
int taosOpenNewNote(taosNoteInfo * pNote) static int32_t taosOpenNewNote(SNoteObj *pNote) {
{ pthread_mutex_lock(&pNote->mutex);
pthread_mutex_lock(&pNote->taosNoteMutex);
if (pNote->taosNoteLines > pNote->taosNoteMaxLines && pNote->taosNoteOpenInProgress == 0) { if (pNote->lines > pNote->maxLines && pNote->openInProgress == 0) {
pNote->taosNoteOpenInProgress = 1; pNote->openInProgress = 1;
taosNotePrint(pNote, "=============== open new note =================="); taosNotePrint(pNote, "=============== open new note ==================");
pthread_t pattern; pthread_t pattern;
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&pattern, &attr, taosThreadToOpenNewNote, (void*)pNote); pthread_create(&pattern, &attr, taosThreadToOpenNewNote, (void *)pNote);
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
} }
pthread_mutex_unlock(&pNote->taosNoteMutex); pthread_mutex_unlock(&pNote->mutex);
return pNote->taosNoteFd; return pNote->fd;
} }
bool taosCheckNoteIsOpen(char *noteName, taosNoteInfo * pNote) static bool taosCheckNoteIsOpen(char *noteName, SNoteObj *pNote) {
{ int32_t fd = open(noteName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
/* if (fd < 0) {
int exist = access(noteName, F_OK); fprintf(stderr, "failed to open note:%s reason:%s\n", noteName, strerror(errno));
if (exist != 0) { return true;
return false; }
}
*/
int fd = open(noteName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (fd < 0) {
fprintf(stderr, "failed to open note:%s reason:%s\n", noteName, strerror(errno));
return true;
}
if (taosLockNote(fd, pNote)) { if (taosLockNote(fd, pNote)) {
taosUnLockNote(fd, pNote); taosUnLockNote(fd, pNote);
close(fd); close(fd);
return false; return false;
} } else {
else { close(fd);
close(fd); return true;
return true; }
}
} }
void taosGetNoteName(char *fn, taosNoteInfo * pNote) static void taosGetNoteName(char *fn, SNoteObj *pNote) {
{ if (pNote->fileNum > 1) {
if (pNote->taosNoteFileNum > 1) { for (int32_t i = 0; i < pNote->fileNum; i++) {
for (int i = 0; i < pNote->taosNoteFileNum; i++) { char fileName[NOTE_FILE_NAME_LEN];
char fileName[NOTE_FILE_NAME_LEN];
sprintf(fileName, "%s%d.0", fn, i); sprintf(fileName, "%s%d.0", fn, i);
bool file1open = taosCheckNoteIsOpen(fileName, pNote); bool file1open = taosCheckNoteIsOpen(fileName, pNote);
sprintf(fileName, "%s%d.1", fn, i); sprintf(fileName, "%s%d.1", fn, i);
bool file2open = taosCheckNoteIsOpen(fileName, pNote); bool file2open = taosCheckNoteIsOpen(fileName, pNote);
if (!file1open && !file2open) { if (!file1open && !file2open) {
sprintf(pNote->taosNoteName, "%s%d", fn, i); sprintf(pNote->name, "%s%d", fn, i);
return; return;
} }
}
} }
}
if (strlen(fn) < NOTE_FILE_NAME_LEN) { if (strlen(fn) < NOTE_FILE_NAME_LEN) {
strcpy(pNote->taosNoteName, fn); strcpy(pNote->name, fn);
} }
} }
int taosOpenNoteWithMaxLines(char *fn, int maxLines, int maxNoteNum, taosNoteInfo * pNote) static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxNoteNum, SNoteObj *pNote) {
{ char name[NOTE_FILE_NAME_LEN * 2] = {0};
char name[NOTE_FILE_NAME_LEN * 2] = "\0"; int32_t size;
struct stat notestat0, notestat1; struct stat logstat0, logstat1;
int size;
pNote->taosNoteMaxLines = maxLines;
pNote->taosNoteFileNum = maxNoteNum;
taosGetNoteName(fn, pNote);
if (strlen(fn) > NOTE_FILE_NAME_LEN * 2 - 2) { pNote->maxLines = maxLines;
fprintf(stderr, "the len of file name overflow:%s\n", fn); pNote->fileNum = maxNoteNum;
return -1; taosGetNoteName(fn, pNote);
}
if (strlen(fn) < NOTE_FILE_NAME_LEN + 50 - 2) {
strcpy(name, fn); strcpy(name, fn);
strcat(name, ".0"); strcat(name, ".0");
}
bool log0Exist = stat(name, &logstat0) >= 0;
// if none of the note files exist, open 0, if both exists, open the old one if (strlen(fn) < NOTE_FILE_NAME_LEN + 50 - 2) {
if (stat(name, &notestat0) < 0) { strcpy(name, fn);
pNote->taosNoteFlag = 0; strcat(name, ".1");
} else { }
strcpy(name, fn); bool log1Exist = stat(name, &logstat1) >= 0;
strcat(name, ".1");
if (stat(name, &notestat1) < 0) { if (!log0Exist && !log1Exist) {
pNote->taosNoteFlag = 1; pNote->flag = 0;
} } else if (!log1Exist) {
else { pNote->flag = 0;
pNote->taosNoteFlag = (notestat0.st_mtime > notestat1.st_mtime) ? 0 : 1; } else if (!log0Exist) {
} pNote->flag = 1;
} } else {
pNote->flag = (logstat0.st_mtime > logstat1.st_mtime) ? 0 : 1;
char noteName[NOTE_FILE_NAME_LEN * 2] = "\0"; }
sprintf(noteName, "%s.%d", pNote->taosNoteName, pNote->taosNoteFlag);
pthread_mutex_init(&pNote->taosNoteMutex, NULL); char noteName[NOTE_FILE_NAME_LEN * 2] = {0};
sprintf(noteName, "%s.%d", pNote->name, pNote->flag);
umask(0); pthread_mutex_init(&pNote->mutex, NULL);
pNote->taosNoteFd = open(noteName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
umask(0);
if (pNote->taosNoteFd < 0) { pNote->fd = open(noteName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
fprintf(stderr, "failed to open note file:%s reason:%s\n", noteName, strerror(errno));
return -1; if (pNote->fd < 0) {
} fprintf(stderr, "failed to open note file:%s reason:%s\n", noteName, strerror(errno));
taosLockNote(pNote->taosNoteFd, pNote); return -1;
}
// only an estimate for number of lines taosLockNote(pNote->fd, pNote);
struct stat filestat;
if (fstat(pNote->taosNoteFd, &filestat) < 0) { // only an estimate for number of lines
fprintf(stderr, "failed to fstat note file:%s reason:%s\n", noteName, strerror(errno)); struct stat filestat;
return -1; if (fstat(pNote->fd, &filestat) < 0) {
} fprintf(stderr, "failed to fstat note file:%s reason:%s\n", noteName, strerror(errno));
size = (int)filestat.st_size; return -1;
pNote->taosNoteLines = size / 60; }
size = (int32_t)filestat.st_size;
pNote->lines = size / 60;
lseek(pNote->fd, 0, SEEK_END);
return 0;
}
lseek(pNote->taosNoteFd, 0, SEEK_END); void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len) {
if (pNote->fd <= 0) return;
taosWrite(pNote->fd, buffer, len);
return 0; if (pNote->maxLines > 0) {
pNote->lines++;
if ((pNote->lines > pNote->maxLines) && (pNote->openInProgress == 0)) taosOpenNewNote(pNote);
}
} }
void taosNotePrint(taosNoteInfo * pNote, const char * const format, ...) void taosNotePrint(SNoteObj *pNote, const char *const format, ...) {
{ va_list argpointer;
va_list argpointer; char buffer[MAX_NOTE_LINE_SIZE + 2];
char buffer[MAX_NOTE_LINE_SIZE+2]; int32_t len;
int len; struct tm Tm, *ptm;
struct tm Tm, *ptm; struct timeval timeSecs;
struct timeval timeSecs; time_t curTime;
time_t curTime;
gettimeofday(&timeSecs, NULL);
gettimeofday(&timeSecs, NULL); curTime = timeSecs.tv_sec;
curTime = timeSecs.tv_sec; ptm = localtime_r(&curTime, &Tm);
ptm = localtime_r(&curTime, &Tm); len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
#ifndef LINUX ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetPthreadId());
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, va_start(argpointer, format);
ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId()); len += vsnprintf(buffer + len, MAX_NOTE_LINE_SIZE - len, format, argpointer);
#else va_end(argpointer);
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, (unsigned long int)pthread_self()); if (len >= MAX_NOTE_LINE_SIZE) len = MAX_NOTE_LINE_SIZE - 2;
#endif
va_start(argpointer, format); buffer[len++] = '\n';
len += vsnprintf(buffer + len, MAX_NOTE_LINE_SIZE - len, format, argpointer); buffer[len] = 0;
va_end(argpointer);
taosNotePrintBuffer(pNote, buffer, len);
if (len >= MAX_NOTE_LINE_SIZE) len = MAX_NOTE_LINE_SIZE - 2;
buffer[len++] = '\n';
buffer[len] = 0;
if (pNote->taosNoteFd >= 0) {
taosWrite(pNote->taosNoteFd, buffer, (unsigned int)len);
if (pNote->taosNoteMaxLines > 0) {
pNote->taosNoteLines++;
if ((pNote->taosNoteLines > pNote->taosNoteMaxLines) && (pNote->taosNoteOpenInProgress == 0))
taosOpenNewNote(pNote);
}
}
} }
void taosCloseNote(taosNoteInfo * pNote) // static void taosCloseNote(SNoteObj *pNote) { taosCloseNoteByFd(pNote->fd, pNote); }
{
taosCloseNoteByFd(pNote->taosNoteFd, pNote);
}
void taosCloseNoteByFd(int fd, taosNoteInfo * pNote) static void taosCloseNoteByFd(int32_t fd, SNoteObj *pNote) {
{ if (fd >= 0) {
if (fd >= 0) { taosUnLockNote(fd, pNote);
taosUnLockNote(fd, pNote); close(fd);
close(fd); }
}
} }
...@@ -442,7 +442,6 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { ...@@ -442,7 +442,6 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
if (status == TSDB_STATUS_COMMIT_START) { if (status == TSDB_STATUS_COMMIT_START) {
pVnode->isCommiting = 1; pVnode->isCommiting = 1;
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (!vnodeInInitStatus(pVnode)) { if (!vnodeInInitStatus(pVnode)) {
return walRenew(pVnode->wal); return walRenew(pVnode->wal);
...@@ -451,9 +450,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { ...@@ -451,9 +450,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
} }
if (status == TSDB_STATUS_COMMIT_OVER) { if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
pVnode->isCommiting = 0; pVnode->isCommiting = 0;
pVnode->isFull = 0; pVnode->isFull = 0;
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (!vnodeInInitStatus(pVnode)) { if (!vnodeInInitStatus(pVnode)) {
walRemoveOneOldFile(pVnode->wal); walRemoveOneOldFile(pVnode->wal);
} }
......
properties([pipelineTriggers([githubPush()])])
node {
git url: 'https://github.com/liuyq-617/TDengine'
}
def pre_test(){
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
sudo rmtaos
'''
}
sh '''
cd ${WKC}
rm -rf *
cd ${WK}
git reset --hard
git checkout develop
git pull
cd ${WKC}
rm -rf *
mv ${WORKSPACE}/* .
cd ${WK}
export TZ=Asia/Harbin
date
rm -rf ${WK}/debug
mkdir debug
cd debug
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
cd ${WKC}/tests
'''
return 1
}
pipeline {
agent none
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community'
}
stages {
stage('Parallel test stage') {
parallel {
stage('python p1') {
agent{label 'p1'}
steps {
pre_test()
sh '''
cd ${WKC}/tests
./test-all.sh p1
date'''
}
}
stage('test_b1') {
agent{label 'master'}
steps {
pre_test()
sh '''
cd ${WKC}/tests
./test-all.sh b1
date'''
}
}
stage('test_crash_gen') {
agent{label "b2"}
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./handle_crash_gen_val_log.sh
'''
}
sh '''
date
cd ${WKC}/tests
./test-all.sh b2
date
'''
}
}
stage('test_valgrind') {
agent{label "b3"}
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
./handle_val_log.sh
'''
}
sh '''
date
cd ${WKC}/tests
./test-all.sh b3
date'''
}
}
stage('python p2'){
agent{label "p2"}
steps{
pre_test()
sh '''
date
cd ${WKC}/tests
./test-all.sh p2
date
'''
}
}
}
}
}
}
...@@ -38,11 +38,16 @@ ...@@ -38,11 +38,16 @@
<artifactId>h2</artifactId> <artifactId>h2</artifactId>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.17</version>
</dependency>
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.11</version> <version>2.0.14</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
......
spring: spring:
datasource: datasource:
# driver-class-name: org.h2.Driver
# schema: classpath:db/schema-mysql.sql
# data: classpath:db/data-mysql.sql
# url: jdbc:h2:mem:test
# username: root
# password: test
# driver-class-name: com.mysql.jdbc.Driver
# url: jdbc:mysql://master:3306/test?useSSL=false
# username: root
# password: 123456
driver-class-name: com.taosdata.jdbc.TSDBDriver driver-class-name: com.taosdata.jdbc.TSDBDriver
url: jdbc:TAOS://localhost:6030/mp_test url: jdbc:TAOS://localhost:6030/mp_test
user: root user: root
...@@ -20,6 +8,12 @@ spring: ...@@ -20,6 +8,12 @@ spring:
locale: en_US.UTF-8 locale: en_US.UTF-8
timezone: UTC-8 timezone: UTC-8
druid:
initial-size: 5
min-idle: 5
max-active: 5
mybatis-plus: mybatis-plus:
configuration: configuration:
map-underscore-to-camel-case: false map-underscore-to-camel-case: false
......
...@@ -65,7 +65,18 @@ function runQueryPerfTest { ...@@ -65,7 +65,18 @@ function runQueryPerfTest {
echoInfo "Run Performance Test" echoInfo "Run Performance Test"
cd $WORK_DIR/TDengine/tests/pytest cd $WORK_DIR/TDengine/tests/pytest
python3 query/queryPerformance.py 0 | tee -a $PERFORMANCE_TEST_REPORT python3 query/queryPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
yes | taosdemo -c /etc/taosperf/ -d taosdemo_insert_test -t 1000 -n 1000 > taosdemoperf.txt
CREATETABLETIME=`grep 'Spent' taosdemoperf.txt | awk 'NR==1{print $2}'`
INSERTRECORDSTIME=`grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $2}'`
REQUESTSPERSECOND=`grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $13}'`
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT -t $CREATETABLETIME -i $INSERTRECORDSTIME -r $REQUESTSPERSECOND | tee -a $PERFORMANCE_TEST_REPORT
[ -f taosdemoperf.txt ] && rm taosdemoperf.txt
} }
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
import taos
if __name__ == "__main__":
logSql = True
deployPath = ""
testCluster = False
valgrind = 0
print("start to execute %s" % __file__)
tdDnodes.init(deployPath)
tdDnodes.setTestCluster(testCluster)
tdDnodes.setValgrind(valgrind)
tdDnodes.stopAll()
tdDnodes.addSimExtraCfg("maxSQLLength", "1048576")
tdDnodes.deploy(1)
tdDnodes.start(1)
host = '127.0.0.1'
tdLog.info("Procedures for tdengine deployed in %s" % (host))
tdCases.logSql(logSql)
print('1')
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
tdSql.init(conn.cursor(), True)
print("==========step1")
print("create table ")
tdSql.execute("create database db")
tdSql.execute("use db")
tdSql.execute("create table t1 (ts timestamp, c1 int,c2 int ,c3 int)")
print("==========step2")
print("insert maxSQLLength data ")
data = 'insert into t1 values'
ts = 1604298064000
i = 0
while ((len(data)<(1024*1024)) & (i < 32767 - 1) ):
data += '(%s,%d,%d,%d)'%(ts+i,i%1000,i%1000,i%1000)
i+=1
tdSql.execute(data)
print("==========step4")
print("insert data batch larger than 32767 ")
i = 0
while ((len(data)<(1024*1024)) & (i < 32767) ):
data += '(%s,%d,%d,%d)'%(ts+i,i%1000,i%1000,i%1000)
i+=1
tdSql.error(data)
print("==========step4")
print("insert data larger than maxSQLLength ")
tdSql.execute("create table t2 (ts timestamp, c1 binary(50))")
data = 'insert into t2 values'
i = 0
while ((len(data)<(1024*1024)) & (i < 32767 - 1 ) ):
data += '(%s,%s)'%(ts+i,'a'*50)
i+=1
tdSql.error(data)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
...@@ -20,6 +20,7 @@ python3 insert/retentionpolicy.py ...@@ -20,6 +20,7 @@ python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f insert/insertIntoTwoTables.py python3 ./test.py -f insert/insertIntoTwoTables.py
python3 ./test.py -f insert/before_1970.py python3 ./test.py -f insert/before_1970.py
python3 bug2265.py
python3 ./test.py -f table/alter_wal0.py python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py python3 ./test.py -f table/column_name.py
...@@ -27,6 +28,8 @@ python3 ./test.py -f table/column_num.py ...@@ -27,6 +28,8 @@ python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py #python3 ./test.py -f table/tablename-boundary.py
python3 ./test.py -f table/max_table_length.py
# tag # tag
python3 ./test.py -f tag_lite/filter.py python3 ./test.py -f tag_lite/filter.py
...@@ -164,7 +167,8 @@ python3 ./test.py -f query/bug2117.py ...@@ -164,7 +167,8 @@ python3 ./test.py -f query/bug2117.py
python3 ./test.py -f query/bug2143.py python3 ./test.py -f query/bug2143.py
python3 ./test.py -f query/sliding.py python3 ./test.py -f query/sliding.py
python3 ./test.py -f query/unionAllTest.py python3 ./test.py -f query/unionAllTest.py
python3 ./test.py -f query/bug2281.py
python3 ./test.py -f query/bug2119.py
#stream #stream
python3 ./test.py -f stream/metric_1.py python3 ./test.py -f stream/metric_1.py
python3 ./test.py -f stream/new.py python3 ./test.py -f stream/new.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import time
import datetime
import csv
import random
import pandas as pd
import argparse
import os.path
class insertFromCSVPerformace:
def __init__(self, commitID, dbName, stbName, branchName):
self.commitID = commitID
self.dbName = dbName
self.stbName = stbName
self.branchName = branchName
self.ts = 1500074556514
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taosperf"
self.conn = taos.connect(
self.host,
self.user,
self.password,
self.config)
def writeCSV(self):
with open('test3.csv','w', encoding='utf-8', newline='') as csvFile:
writer = csv.writer(csvFile, dialect='excel')
for i in range(1000000):
newTimestamp = self.ts + random.randint(10000000, 10000000000) + random.randint(1000, 10000000) + random.randint(1, 1000)
d = datetime.datetime.fromtimestamp(newTimestamp / 1000)
dt = str(d.strftime("%Y-%m-%d %H:%M:%S.%f"))
writer.writerow(["'%s'" % dt, random.randint(1, 100), random.uniform(1, 100), random.randint(1, 100), random.randint(1, 100)])
def removCSVHeader(self):
data = pd.read_csv("ordered.csv")
data = data.drop([0])
data.to_csv("ordered.csv", header = False, index = False)
def createTables(self):
cursor = self.conn.cursor()
cursor.execute("create database if not exists %s" % self.dbName)
cursor.execute("use %s" % self.dbName)
cursor.execute("create table if not exists %s(ts timestamp, in_order_time float, out_of_order_time float, commit_id binary(50)) tags(branch binary(50))" % self.stbName)
cursor.execute("create table if not exists %s using %s tags('%s')" % (self.branchName, self.stbName, self.branchName))
cursor.execute("create table if not exists t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
cursor.execute("create table if not exists t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
cursor.close()
def run(self):
cursor = self.conn.cursor()
cursor.execute("use %s" % self.dbName)
print("==================== CSV insert performance ====================")
totalTime = 0
for i in range(10):
cursor.execute("create table if not exists t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
startTime = time.time()
cursor.execute("insert into t1 file 'outoforder.csv'")
totalTime += time.time() - startTime
cursor.execute("drop table if exists t1")
out_of_order_time = (float) (totalTime / 10)
print("Out of Order - Insert time: %f" % out_of_order_time)
totalTime = 0
for i in range(10):
cursor.execute("create table if not exists t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
startTime = time.time()
cursor.execute("insert into t2 file 'ordered.csv'")
totalTime += time.time() - startTime
cursor.execute("drop table if exists t2")
in_order_time = (float) (totalTime / 10)
print("In order - Insert time: %f" % in_order_time)
cursor.execute("insert into %s values(now, %f, %f, '%s')" % (self.branchName, in_order_time, out_of_order_time, self.commitID))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'-c',
'--commit-id',
action='store',
default='null',
type=str,
help='git commit id (default: null)')
parser.add_argument(
'-d',
'--database-name',
action='store',
default='perf',
type=str,
help='Database name to be created (default: perf)')
parser.add_argument(
'-t',
'--stable-name',
action='store',
default='csv_insert',
type=str,
help='Database name to be created (default: csv_insert)')
parser.add_argument(
'-b',
'--branch-name',
action='store',
default='develop',
type=str,
help='branch name (default: develop)')
args = parser.parse_args()
perftest = insertFromCSVPerformace(args.commit_id, args.database_name, args.stable_name, args.branch_name)
perftest.createTables()
perftest.run()
\ No newline at end of file
...@@ -26,6 +26,7 @@ python3 ./test.py -f table/column_num.py ...@@ -26,6 +26,7 @@ python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py #python3 ./test.py -f table/tablename-boundary.py
python3 ./test.py -f table/max_table_length.py
# tag # tag
python3 ./test.py -f tag_lite/filter.py python3 ./test.py -f tag_lite/filter.py
...@@ -159,7 +160,9 @@ python3 ./test.py -f query/bug1874.py ...@@ -159,7 +160,9 @@ python3 ./test.py -f query/bug1874.py
python3 ./test.py -f query/bug1875.py python3 ./test.py -f query/bug1875.py
python3 ./test.py -f query/bug1876.py python3 ./test.py -f query/bug1876.py
python3 ./test.py -f query/bug2218.py python3 ./test.py -f query/bug2218.py
python3 ./test.py -f query/bug2281.py
python3 ./test.py -f query/bug2119.py
python3 bug2265.py
#stream #stream
python3 ./test.py -f stream/metric_1.py python3 ./test.py -f stream/metric_1.py
python3 ./test.py -f stream/new.py python3 ./test.py -f stream/new.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==========step1")
print("create table && insert data")
tdSql.execute("create table t1 (ts timestamp, c1 int, c2 float)")
print("==========step2")
print("query percentile from blank table")
tdSql.query('select percentile(c1,1) from t1')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==========step1")
print("create table && insert data")
tdSql.execute("create table t1 (ts timestamp, c1 int, c2 float)")
insertRows = 10
t0 = 1604298064000
tdLog.info("insert %d rows" % (insertRows))
for i in range(insertRows):
ret = tdSql.execute(
"insert into t1 values (%d , %d,%d)" %
(t0+i,i%100,i/2.0))
print("==========step2")
print("query diff && top")
tdSql.error('select diff(c1),top(c2) from t1')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -16,10 +16,16 @@ import sys ...@@ -16,10 +16,16 @@ import sys
import os import os
import taos import taos
import time import time
import argparse
class taosdemoQueryPerformace: class taosdemoQueryPerformace:
def initConnection(self): def __init__(self, clearCache, commitID, dbName, stbName, tbPerfix):
self.clearCache = clearCache
self.commitID = commitID
self.dbName = dbName
self.stbName = stbName
self.tbPerfix = tbPerfix
self.host = "127.0.0.1" self.host = "127.0.0.1"
self.user = "root" self.user = "root"
self.password = "taosdata" self.password = "taosdata"
...@@ -30,92 +36,109 @@ class taosdemoQueryPerformace: ...@@ -30,92 +36,109 @@ class taosdemoQueryPerformace:
self.password, self.password,
self.config) self.config)
def createPerfTables(self):
cursor = self.conn.cursor()
cursor.execute("create database if not exists %s" % self.dbName)
cursor.execute("use %s" % self.dbName)
cursor.execute("create table if not exists %s(ts timestamp, query_time float, commit_id binary(50)) tags(query_id int, query_sql binary(300))" % self.stbName)
sql = "select count(*) from test.meters"
tableid = 1
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select avg(f1), max(f2), min(f3) from test.meters"
tableid = 2
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select count(*) from test.meters where loc='beijing'"
tableid = 3
cursor.execute("create table if not exists %s%d using %s tags(%d, \"%s\")" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select avg(f1), max(f2), min(f3) from test.meters where areaid=10"
tableid = 4
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select avg(f1), max(f2), min(f3) from test.t10 interval(10s)"
tableid = 5
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select last_row(*) from meters"
tableid = 6
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select * from meters"
tableid = 7
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000'"
tableid = 8
cursor.execute("create table if not exists %s%d using %s tags(%d, \"%s\")" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
cursor.close()
def query(self): def query(self):
cursor = self.conn.cursor() cursor = self.conn.cursor()
cursor.execute("use test") print("==================== query performance ====================")
totalTime = 0 cursor.execute("use %s" % self.dbName)
for i in range(100): cursor.execute("select tbname, query_id, query_sql from %s" % self.stbName)
if(sys.argv[1] == '1'):
# root permission is required for data in cursor:
os.system("echo 3 > /proc/sys/vm/drop_caches") table_name = data[0]
startTime = time.time() query_id = data[1]
cursor.execute("select count(*) from test.meters") sql = data[2]
totalTime += time.time() - startTime
print("query time for: select count(*) from test.meters %f seconds" % (totalTime / 100)) totalTime = 0
cursor2 = self.conn.cursor()
totalTime = 0 cursor2.execute("use test")
for i in range(100): for i in range(100):
if(sys.argv[1] == '1'): if(self.clearCache == True):
# root permission is required # root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches") os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters") startTime = time.time()
totalTime += time.time() - startTime cursor2.execute(sql)
print("query time for: select avg(f1), max(f2), min(f3) from test.meters %f seconds" % (totalTime / 100)) totalTime += time.time() - startTime
cursor2.close()
totalTime = 0 print("query time for: %s %f seconds" % (sql, totalTime / 100))
for i in range(100):
if(sys.argv[1] == '1'): cursor3 = self.conn.cursor()
# root permission is required cursor3.execute("insert into %s.%s values(now, %f, '%s')" % (self.dbName, table_name, totalTime / 100, self.commitID))
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time() cursor3.close()
cursor.execute("select count(*) from test.meters where loc='beijing'") cursor.close()
totalTime += time.time() - startTime
print("query time for: select count(*) from test.meters where loc='beijing' %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters where areaid=10")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from test.meters where areaid=10 %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.t10 interval(10s)")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from test.t10 interval(10s) %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select last_row(*) from meters")
totalTime += time.time() - startTime
print("query time for: select last_row(*) from meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select * from meters")
totalTime += time.time() - startTime
print("query time for: select * from meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000'")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000' %f seconds" % (totalTime / 100))
if __name__ == '__main__': if __name__ == '__main__':
perftest = taosdemoQueryPerformace() parser = argparse.ArgumentParser()
perftest.initConnection() parser.add_argument(
perftest.query() '-r',
\ No newline at end of file '--remove-cache',
action='store_true',
default=False,
help='clear cache before query (default: False)')
parser.add_argument(
'-c',
'--commit-id',
action='store',
default='null',
type=str,
help='git commit id (default: null)')
parser.add_argument(
'-d',
'--database-name',
action='store',
default='perf',
type=str,
help='Database name to be created (default: perf)')
parser.add_argument(
'-t',
'--stable-name',
action='store',
default='query_tb',
type=str,
help='table name to be created (default: query_tb)')
parser.add_argument(
'-p',
'--table-perfix',
action='store',
default='q',
type=str,
help='table name perfix (default: q)')
args = parser.parse_args()
perftest = taosdemoQueryPerformace(args.remove_cache, args.commit_id, args.database_name, args.stable_name, args.table_perfix)
perftest.createPerfTables()
perftest.query()
...@@ -52,7 +52,7 @@ class TDTestCase: ...@@ -52,7 +52,7 @@ class TDTestCase:
tdSql.checkRows(5) tdSql.checkRows(5)
sql = ''' select * from st where loc = 'nchar0' limit 1 union all select * from st where loc = 'nchar1' limit 1 union all select * from st where loc = 'nchar2' limit 1 sql = ''' select * from st where loc = 'nchar0' limit 1 union all select * from st where loc = 'nchar1' limit 1 union all select * from st where loc = 'nchar2' limit 1
union all select * from st where loc = 'nchar3' limit 1 union all select * from st where loc = 'nchar4' limit 1 union all select * from st where loc = 'nchar5'''' union all select * from st where loc = 'nchar3' limit 1 union all select * from st where loc = 'nchar4' limit 1 union all select * from st where loc = 'nchar5' limit 1'''
tdSql.query(sql) tdSql.query(sql)
tdSql.checkRows(6) tdSql.checkRows(6)
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, db_test.stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==============step1")
tdLog.info("check nchar")
tdSql.error("create database anal (ts timestamp ,i nchar(4094))")
tdSql.execute(
"create table anal (ts timestamp ,i nchar(4093))")
print("==============step2")
tdLog.info("check binary")
tdSql.error("create database anal (ts timestamp ,i binary(16375))")
tdSql.execute(
"create table anal1 (ts timestamp ,i binary(16374))")
print("==============step3")
tdLog.info("check int & binary")
tdSql.error("create table anal2 (ts timestamp ,i binary(16371),j int)")
tdSql.execute("create table anal2 (ts timestamp ,i binary(16370),j int)")
tdSql.execute("create table anal3 (ts timestamp ,i binary(16366), j int, k int)")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import time
import datetime
import csv
import random
import pandas as pd
import argparse
import os.path
class taosdemoPerformace:
def __init__(self, commitID, dbName, createTableTime, insertRecordsTime, recordsPerSecond):
self.commitID = commitID
self.dbName = dbName
self.createTableTime = createTableTime
self.insertRecordsTime = insertRecordsTime
self.recordsPerSecond = recordsPerSecond
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taosperf"
self.conn = taos.connect(
self.host,
self.user,
self.password,
self.config)
def createTablesAndStoreData(self):
cursor = self.conn.cursor()
cursor.execute("create database if not exists %s" % self.dbName)
cursor.execute("use %s" % self.dbName)
cursor.execute("create table if not exists taosdemo_perf (ts timestamp, create_table_time float, insert_records_time float, records_per_second float, commit_id binary(50))")
print("==================== taosdemo performance ====================")
print("create tables time: %f" % self.createTableTime)
print("insert records time: %f" % self.insertRecordsTime)
print("records per second: %f" % self.recordsPerSecond)
cursor.execute("insert into taosdemo_perf values(now, %f, %f, %f, '%s')" % (self.createTableTime, self.insertRecordsTime, self.recordsPerSecond, self.commitID))
cursor.execute("drop database if exists taosdemo_insert_test")
cursor.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'-c',
'--commit-id',
action='store',
type=str,
help='git commit id (default: null)')
parser.add_argument(
'-d',
'--database-name',
action='store',
default='perf',
type=str,
help='Database name to be created (default: perf)')
parser.add_argument(
'-t',
'--create-table',
action='store',
type=float,
help='create table time')
parser.add_argument(
'-i',
'--insert-records',
action='store',
type=float,
help='insert records time')
parser.add_argument(
'-r',
'---records-per-second',
action='store',
type=float,
help='records per request')
args = parser.parse_args()
perftest = taosdemoPerformace(args.commit_id, args.database_name, args.create_table, args.insert_records, args.records_per_second)
perftest.createTablesAndStoreData()
\ No newline at end of file
...@@ -24,15 +24,17 @@ ...@@ -24,15 +24,17 @@
#define GREEN "\033[1;32m" #define GREEN "\033[1;32m"
#define NC "\033[0m" #define NC "\033[0m"
int32_t capacity = 100000; int32_t capacity = 128;
int32_t q1Times = 1; int32_t q1Times = 10;
int32_t q2Times = 1; int32_t q2Times = 10;
int32_t keyNum = 100000; int32_t keyNum = 100000;
int32_t printInterval = 10000; int32_t printInterval = 1000;
void * hashHandle;
pthread_t thread;
typedef struct HashTestRow { typedef struct HashTestRow {
int32_t size; int32_t keySize;
void * ptr; char key[100];
} HashTestRow; } HashTestRow;
void shellParseArgument(int argc, char *argv[]); void shellParseArgument(int argc, char *argv[]);
...@@ -40,7 +42,7 @@ void shellParseArgument(int argc, char *argv[]); ...@@ -40,7 +42,7 @@ void shellParseArgument(int argc, char *argv[]);
void testHashPerformance() { void testHashPerformance() {
int64_t initialMs = taosGetTimestampMs(); int64_t initialMs = taosGetTimestampMs();
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
void * hashHandle = taosHashInit(capacity, hashFp, true); hashHandle = taosHashInit(128, hashFp, true, HASH_NO_LOCK);
int64_t startMs = taosGetTimestampMs(); int64_t startMs = taosGetTimestampMs();
float seconds = (startMs - initialMs) / 1000.0; float seconds = (startMs - initialMs) / 1000.0;
...@@ -48,17 +50,25 @@ void testHashPerformance() { ...@@ -48,17 +50,25 @@ void testHashPerformance() {
for (int32_t t = 1; t <= keyNum; ++t) { for (int32_t t = 1; t <= keyNum; ++t) {
HashTestRow row = {0}; HashTestRow row = {0};
char key[100] = {0}; row.keySize = sprintf(row.key, "0.db.st%d", t);
int32_t keySize = sprintf(key, "0.db.st%d", t);
for (int32_t q = 0; q < q1Times; q++) { for (int32_t q = 0; q < q1Times; q++) {
taosHashGet(hashHandle, &key, keySize); taosHashGet(hashHandle, row.key, row.keySize);
} }
taosHashPut(hashHandle, key, keySize, &row, sizeof(HashTestRow)); taosHashPut(hashHandle, row.key, row.keySize, &row, sizeof(HashTestRow));
for (int32_t q = 0; q < q2Times; q++) { for (int32_t q = 0; q < q2Times; q++) {
taosHashGet(hashHandle, &key, keySize); taosHashGet(hashHandle, row.key, row.keySize);
}
// test iterator
{
HashTestRow *row = taosHashIterate(hashHandle, NULL);
while (row) {
taosHashGet(hashHandle, row->key, row->keySize);
row = taosHashIterate(hashHandle, row);
}
} }
if (t % printInterval == 0) { if (t % printInterval == 0) {
...@@ -80,9 +90,35 @@ void testHashPerformance() { ...@@ -80,9 +90,35 @@ void testHashPerformance() {
taosHashCleanup(hashHandle); taosHashCleanup(hashHandle);
} }
void *multiThreadFunc(void *param) {
for (int i = 0; i < 100; ++i) {
taosMsleep(1000);
HashTestRow *row = taosHashIterate(hashHandle, NULL);
while (row) {
taosHashGet(hashHandle, row->key, row->keySize);
row = taosHashIterate(hashHandle, row);
}
int64_t hashSize = taosHashGetSize(hashHandle);
pPrint("i:%d hashSize:%ld", i, hashSize);
}
return NULL;
}
void multiThreadTest() {
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
// Start threads to write
pthread_create(&thread, &thattr, multiThreadFunc, NULL);
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
shellParseArgument(argc, argv); shellParseArgument(argc, argv);
multiThreadTest();
testHashPerformance(); testHashPerformance();
pthread_join(thread, NULL);
} }
void printHelp() { void printHelp() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册