提交 33617806 编写于 作者: L liuyq-617

Merge branch 'develop' into test/jenkins

......@@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.14-dist.jar DESTINATION connector/jdbc)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.15-dist.jar DESTINATION connector/jdbc)
ENDIF ()
ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
......
......@@ -382,6 +382,7 @@ typedef struct SSqlObj {
typedef struct SSqlStream {
SSqlObj *pSql;
const char* dstTable;
uint32_t streamId;
char listed;
bool isProject;
......@@ -408,6 +409,8 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next;
} SSqlStream;
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
void tscInitMsgsFp();
......
......@@ -262,6 +262,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SSqlStream *pStream = pObj->streamList;
while (pStream) {
tstrncpy(pSdesc->sql, pStream->pSql->sqlstr, sizeof(pSdesc->sql));
if (pStream->dstTable == NULL) {
pSdesc->dstTable[0] = 0;
} else {
tstrncpy(pSdesc->dstTable, pStream->dstTable, sizeof(pSdesc->dstTable));
}
pSdesc->streamId = htonl(pStream->streamId);
pSdesc->num = htobe64(pStream->num);
......
......@@ -3282,7 +3282,12 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
((pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0);
if (pColFilter->filterstr) {
if (pExpr->nSQLOptr != TK_EQ && pExpr->nSQLOptr != TK_NE && pExpr->nSQLOptr != TK_LIKE) {
if (pExpr->nSQLOptr != TK_EQ
&& pExpr->nSQLOptr != TK_NE
&& pExpr->nSQLOptr != TK_ISNULL
&& pExpr->nSQLOptr != TK_NOTNULL
&& pExpr->nSQLOptr != TK_LIKE
) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
} else {
......
......@@ -535,6 +535,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr);
}
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
pStream->dstTable = dstTable;
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos;
......
......@@ -203,10 +203,10 @@ int32_t tsVersion = 0;
// log
int32_t tsNumOfLogLines = 10000000;
int32_t mDebugFlag = 135;
int32_t sdbDebugFlag = 135;
int32_t mDebugFlag = 131;
int32_t sdbDebugFlag = 131;
int32_t dDebugFlag = 135;
int32_t vDebugFlag = 135;
int32_t vDebugFlag = 131;
int32_t cDebugFlag = 131;
int32_t jniDebugFlag = 131;
int32_t odbcDebugFlag = 131;
......@@ -220,7 +220,7 @@ int32_t debugFlag = 0;
int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 135;
int32_t cqDebugFlag = 131;
int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL;
......@@ -416,7 +416,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "arbitrator";
cfg.ptr = tsArbitrator;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_EP_LEN;
......@@ -901,7 +901,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "timezone";
cfg.ptr = tsTimezone;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsTimezone);
......@@ -911,7 +911,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "locale";
cfg.ptr = tsLocale;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsLocale);
......@@ -921,7 +921,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "charset";
cfg.ptr = tsCharset;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsCharset);
......
......@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.14-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.15-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
......
......@@ -5,7 +5,7 @@
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.14</version>
<version>2.0.15</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
......@@ -36,7 +36,6 @@
</developer>
</developers>
<dependencies>
<dependency>
<groupId>commons-logging</groupId>
......
......@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.14</version>
<version>2.0.15</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
......
......@@ -81,7 +81,7 @@ public class TSDBStatement implements Statement {
}
if (!this.connector.isUpdateQuery(pSql)) {
TSDBResultSet res = new TSDBResultSet(this.connector, resultSetPointer);
TSDBResultSet res = new TSDBResultSet(this.connector, resultSetPointer);
res.setBatchFetch(this.connection.getBatchFetch());
return res;
} else {
......@@ -125,7 +125,8 @@ public class TSDBStatement implements Statement {
}
public int getMaxFieldSize() throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return 0;
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
public void setMaxFieldSize(int max) throws SQLException {
......@@ -218,7 +219,8 @@ public class TSDBStatement implements Statement {
}
public int getFetchDirection() throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return ResultSet.FETCH_FORWARD;
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
/*
......
......@@ -57,6 +57,7 @@ typedef struct SCqObj {
uint64_t uid;
int32_t tid; // table ID
int32_t rowSize; // bytes of a row
char * dstTable;
char * sqlStr; // SQL string
STSchema * pSchema; // pointer to schema array
void * pStream;
......@@ -185,7 +186,7 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex);
}
void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) {
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) {
if (tsEnableStream == 0) {
return NULL;
}
......@@ -195,9 +196,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *
if (pObj == NULL) return NULL;
pObj->uid = uid;
pObj->tid = tid;
pObj->sqlStr = malloc(strlen(sqlStr)+1);
strcpy(pObj->sqlStr, sqlStr);
pObj->tid = sid;
if (dstTable != NULL) {
pObj->dstTable = strdup(dstTable);
}
pObj->sqlStr = strdup(sqlStr);
pObj->pSchema = tdDupSchema(pSchema);
pObj->rowSize = schemaTLen(pSchema);
......@@ -247,6 +250,7 @@ void cqDrop(void *handle) {
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->dstTable);
free(pObj->sqlStr);
free(pObj);
......@@ -292,6 +296,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
if (pObj->pStream) {
tscSetStreamDestTable(pObj->pStream, pObj->dstTable);
pContext->num++;
cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
......
......@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
}
tdFreeSchema(pSchema);
......
......@@ -36,6 +36,14 @@ extern int32_t dDebugFlag;
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef enum {
TSDB_RUN_STATUS_INITIALIZE,
TSDB_RUN_STATUS_RUNING,
TSDB_RUN_STATUS_STOPPED
} SRunStatus;
SRunStatus dnodeGetRunStatus();
#ifdef __cplusplus
}
#endif
......
......@@ -22,8 +22,8 @@ extern "C" {
#include "dnodeInt.h"
int32_t dnodeInitModules();
void dnodeStartModules();
void dnodeCleanupModules();
bool dnodeStartMnode(SMInfos *pMinfos);
void dnodeProcessModuleStatus(uint32_t moduleStatus);
#ifdef __cplusplus
......
......@@ -23,8 +23,8 @@ extern "C" {
int32_t dnodeInitVnodes();
void dnodeCleanupVnodes();
int32_t dnodeInitTimer();
void dnodeCleanupTimer();
int32_t dnodeInitStatusTimer();
void dnodeCleanupStatusTimer();
void dnodeSendStatusMsgToMnode();
#ifdef __cplusplus
......
......@@ -237,7 +237,7 @@ PRASE_EPS_OVER:
dnodeUpdateEp(dnodeGetDnodeId(), tsLocalEp, tsLocalFqdn, &tsServerPort);
#else
if (dnodeCheckEpChanged(dnodeGetDnodeId(), tsLocalEp)) {
dError("dnode:%d, localEp is changed to %s in dnodeEps.json and need reconfigured", dnodeGetDnodeId(), tsLocalEp);
dError("dnode:%d, localEp is different from %s in dnodeEps.json and need reconfigured", dnodeGetDnodeId(), tsLocalEp);
return -1;
}
#endif
......
......@@ -121,7 +121,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
dnodeSendRedirectMsg(pMsg, true);
} else {
SMnodeMsg *pWrite = mnodeCreateMsg(pMsg);
dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
dTrace("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
......@@ -130,7 +130,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
}
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {
dDebug("msg:%p, app:%p type:%s is freed from mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
dTrace("msg:%p, app:%p type:%s is freed from mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
mnodeCleanupMsg(pWrite);
......
......@@ -17,6 +17,7 @@
#include "os.h"
#include "taos.h"
#include "tnote.h"
#include "ttimer.h"
#include "tconfig.h"
#include "tfile.h"
#include "twal.h"
......@@ -39,6 +40,7 @@
#include "dnodeShell.h"
#include "dnodeTelemetry.h"
void *tsDnodeTmr = NULL;
static SRunStatus tsRunStatus = TSDB_RUN_STATUS_STOPPED;
static int32_t dnodeInitStorage();
......@@ -68,8 +70,8 @@ static SStep tsDnodeSteps[] = {
{"dnode-server", dnodeInitServer, dnodeCleanupServer},
{"dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes},
{"dnode-modules", dnodeInitModules, dnodeCleanupModules},
{"dnode-tmr", dnodeInitTimer, dnodeCleanupTimer},
{"dnode-shell", dnodeInitShell, dnodeCleanupShell},
{"dnode-statustmr", dnodeInitStatusTimer,dnodeCleanupStatusTimer},
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
};
......@@ -91,6 +93,23 @@ static int32_t dnodeInitComponents() {
return dnodeStepInit(tsDnodeSteps, stepSize);
}
static int32_t dnodeInitTmr() {
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
return 0;
}
static void dnodeCleanupTmr() {
if (tsDnodeTmr != NULL) {
taosTmrCleanUp(tsDnodeTmr);
tsDnodeTmr = NULL;
}
}
int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE);
tscEmbedded = 1;
......@@ -100,6 +119,7 @@ int32_t dnodeInitSystem() {
taosReadGlobalLogCfg();
taosSetCoreDump();
taosInitNotes();
dnodeInitTmr();
signal(SIGPIPE, SIG_IGN);
if (dnodeCreateDir(tsLogDir) < 0) {
......@@ -125,7 +145,6 @@ int32_t dnodeInitSystem() {
return -1;
}
dnodeStartModules();
dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING);
dInfo("TDengine is initialized successfully");
......@@ -136,6 +155,7 @@ int32_t dnodeInitSystem() {
void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_RUN_STATUS_STOPPED);
dnodeCleanupTmr();
dnodeCleanupComponents();
taos_cleanup();
taosCloseLog();
......
......@@ -97,6 +97,20 @@ void dnodeCleanupModules() {
}
}
static int32_t dnodeStartModules() {
for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) {
if (tsModule[module].enable && tsModule[module].startFp) {
int32_t code = (*tsModule[module].startFp)();
if (code != 0) {
dError("failed to start module:%s, code:%d", tsModule[module].name, code);
return code;
}
}
}
return 0;
}
int32_t dnodeInitModules() {
dnodeAllocModules();
......@@ -110,17 +124,7 @@ int32_t dnodeInitModules() {
}
dInfo("dnode modules is initialized");
return 0;
}
void dnodeStartModules() {
for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) {
if (tsModule[module].enable && tsModule[module].startFp) {
if ((*tsModule[module].startFp)() != 0) {
dError("failed to start module:%s", tsModule[module].name);
}
}
}
return dnodeStartModules();
}
void dnodeProcessModuleStatus(uint32_t moduleStatus) {
......
......@@ -96,7 +96,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
dDebug("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
return;
}
......@@ -151,7 +151,7 @@ void dnodeCleanupClient() {
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() == TSDB_RUN_STATUS_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dDebug("msg:%p is ignored since dnode is stopping", pMsg);
dTrace("msg:%p is ignored since dnode is stopping", pMsg);
rpcFreeCont(pMsg->pCont);
return;
}
......
......@@ -126,14 +126,14 @@ static void *dnodeProcessMgmtQueue(void *param) {
}
pMsg = &pMgmt->rpcMsg;
dDebug("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
dTrace("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else {
rsp.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
}
dDebug("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
dTrace("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
if (rsp.code != TSDB_CODE_DND_ACTION_IN_PROGRESS) {
rsp.handle = pMsg->handle;
rsp.pCont = NULL;
......
......@@ -30,39 +30,28 @@ typedef struct {
int32_t * vnodeList;
} SOpenVnodeThread;
void * tsDnodeTmr = NULL;
extern void * tsDnodeTmr;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime = 0;
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
int32_t dnodeInitTimer() {
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
int32_t dnodeInitStatusTimer() {
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
tsRebootTime = taosGetTimestampSec();
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
dInfo("dnode timer is initialized");
dInfo("dnode status timer is initialized");
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupTimer() {
void dnodeCleanupStatusTimer() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
if (tsDnodeTmr != NULL) {
taosTmrCleanUp(tsDnodeTmr);
tsDnodeTmr = NULL;
}
}
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
......
......@@ -29,13 +29,6 @@ typedef struct {
int32_t httpReqNum;
} SStatisInfo;
typedef enum {
TSDB_RUN_STATUS_INITIALIZE,
TSDB_RUN_STATUS_RUNING,
TSDB_RUN_STATUS_STOPPED
} SRunStatus;
SRunStatus dnodeGetRunStatus();
SStatisInfo dnodeGetStatisInfo();
bool dnodeIsFirstDeploy();
......
......@@ -787,6 +787,7 @@ typedef struct {
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
char dstTable[TSDB_TABLE_NAME_LEN];
uint32_t streamId;
int64_t num; // number of computing/cycles
int64_t useconds;
......
......@@ -42,7 +42,7 @@ void cqStart(void *handle);
void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, uint64_t uid, int32_t sid, char *sqlStr, STSchema *pSchema);
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle);
......
......@@ -48,7 +48,7 @@ typedef struct {
void *cqH;
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle);
} STsdbAppH;
......
......@@ -45,6 +45,10 @@ void printHelp() {
printf("%s%s%s\n", indent, indent, "Database to use when connecting to the server.");
printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local.");
printf("%s%s\n", indent, "-n");
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup.");
printf("%s%s\n", indent, "-l");
printf("%s%s%s\n", indent, indent, "Packet length used for net test, default is 1000 bytes.");
exit(EXIT_SUCCESS);
}
......@@ -137,6 +141,24 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
exit(EXIT_FAILURE);
}
}
// For time zone
else if (strcmp(argv[i], "-n") == 0) {
if (i < argc - 1) {
arguments->netTestRole = argv[++i];
} else {
fprintf(stderr, "option -n requires an argument\n");
exit(EXIT_FAILURE);
}
}
// For time zone
else if (strcmp(argv[i], "-l") == 0) {
if (i < argc - 1) {
arguments->pktLen = atoi(argv[++i]);
} else {
fprintf(stderr, "option -l requires an argument\n");
exit(EXIT_FAILURE);
}
}
// For temperory command TODO
else if (strcmp(argv[i], "--help") == 0) {
printHelp();
......
此差异已折叠。
......@@ -450,6 +450,12 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "dest table");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port");
......@@ -524,6 +530,10 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->dstTable, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
......
......@@ -396,14 +396,15 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
atomic_add_fetch_32(&pStable->numOfTables, 1);
if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
mDebug("table:%s, create hash:%p", pStable->info.tableId, pStable->vgHash);
}
if (pStable->vgHash != NULL) {
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is put into stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash));
mDebug("table:%s, vgId:%d is put into stable hash:%p, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
pStable->vgHash, taosHashGetSize(pStable->vgHash));
}
}
}
......@@ -416,13 +417,14 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable)
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
if (pVgroup == NULL) {
taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash));
mDebug("table:%s, vgId:%d is remove from stable hash:%p sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
pStable->vgHash, taosHashGetSize(pStable->vgHash));
}
mnodeDecVgroupRef(pVgroup);
}
static void mnodeDestroySuperTable(SSTableObj *pStable) {
mDebug("table:%s, is destroyed, stable hash:%p", pStable->info.tableId, pStable->vgHash);
if (pStable->vgHash != NULL) {
taosHashCleanup(pStable->vgHash);
pStable->vgHash = NULL;
......@@ -464,6 +466,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
SSTableObj *pNew = pRow->pObj;
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
if (pTable != NULL && pTable != pNew) {
mDebug("table:%s, will be updated, hash:%p sizeOfVgList:%d, new hash:%p sizeOfVgList:%d", pTable->info.tableId,
pTable->vgHash, taosHashGetSize(pTable->vgHash), pNew->vgHash, taosHashGetSize(pNew->vgHash));
void *oldTableId = pTable->info.tableId;
void *oldSchema = pTable->schema;
void *oldVgHash = pTable->vgHash;
......@@ -479,6 +484,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
free(pNew);
free(oldTableId);
free(oldSchema);
mDebug("table:%s, update finished, hash:%p sizeOfVgList:%d", pTable->info.tableId, pTable->vgHash,
taosHashGetSize(pTable->vgHash));
}
mnodeDecTableRef(pTable);
......@@ -783,8 +791,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
SSTableObj *pSTable = (SSTableObj *)pMsg->pTable;
mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d",
pMsg, pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, (int32_t)taosHashGetSize(pSTable->vgHash));
mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", pMsg,
pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, taosHashGetSize(pSTable->vgHash));
return mnodeProcessDropSuperTableMsg(pMsg);
} else {
SCTableObj *pCTable = (SCTableObj *)pMsg->pTable;
......@@ -925,7 +933,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
mInfo("msg:%p, app:%p stable:%s will be dropped, hash:%p sizeOfVgList:%d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
int32_t *pVgId = taosHashIterate(pStable->vgHash, NULL);
while (pVgId) {
SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
......@@ -938,8 +949,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
pDrop->uid = htobe64(pStable->uid);
mnodeExtractTableName(pStable->info.tableId, pDrop->tableId);
mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, pVgroup->vgId);
mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d, hash:%p sizeOfVgList:%d", pMsg,
pMsg->rpcMsg.ahandle, pStable->info.tableId, pVgroup->vgId, pStable->vgHash,
taosHashGetSize(pStable->vgHash));
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
dnodeSendMsgToDnode(&epSet, &rpcMsg);
......@@ -1482,8 +1494,8 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pMeta;
mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, pTable->uid);
mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved, sizeOfVgList:%d numOfTables:%d", pMsg,
pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->uid, taosHashGetSize(pTable->vgHash), pTable->numOfTables);
return TSDB_CODE_SUCCESS;
}
......@@ -1512,7 +1524,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
char *msg = (char *)pRsp + sizeof(SSTableVgroupRspMsg);
for (int32_t i = 0; i < numOfTable; ++i) {
char * stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
SSTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable == NULL) {
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
......@@ -1533,6 +1545,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
msg += sizeof(SVgroupsMsg);
} else {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
int32_t vgSize = 0;
......
......@@ -79,7 +79,7 @@ bool httpInitContexts() {
void httpCleanupContexts() {
if (tsHttpServer.contextCache != NULL) {
SCacheObj *cache = tsHttpServer.contextCache;
httpInfo("context cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable));
httpInfo("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.contextCache);
tsHttpServer.contextCache = NULL;
}
......
......@@ -107,7 +107,7 @@ static void httpDestroySession(void *data) {
void httpCleanUpSessions() {
if (tsHttpServer.sessionCache != NULL) {
SCacheObj *cache = tsHttpServer.sessionCache;
httpInfo("session cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable));
httpInfo("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.sessionCache);
tsHttpServer.sessionCache = NULL;
}
......
......@@ -872,7 +872,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
}
}
......
......@@ -828,7 +828,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
}
......
......@@ -82,7 +82,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
* @param pHashObj
* @return
*/
size_t taosHashGetSize(const SHashObj *pHashObj);
int32_t taosHashGetSize(const SHashObj *pHashObj);
/**
* put element into hash table, if the element with the same key exists, update it
......
......@@ -189,7 +189,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
return pHashObj;
}
size_t taosHashGetSize(const SHashObj *pHashObj) { return (pHashObj == NULL) ? 0 : pHashObj->size; }
int32_t taosHashGetSize(const SHashObj *pHashObj) { return (int32_t)((pHashObj == NULL) ? 0 : pHashObj->size); }
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
......
......@@ -240,9 +240,6 @@ void taosReadGlobalLogCfg() {
int olen, vlen;
char fileName[PATH_MAX] = {0};
mDebugFlag = 135;
sdbDebugFlag = 135;
wordexp_t full_path;
if ( 0 != wordexp(configDir, &full_path, 0)) {
printf("\nconfig file: %s wordexp fail! reason:%s\n", configDir, strerror(errno));
......
......@@ -43,12 +43,13 @@ static void *taosNetBindUdpPort(void *sarg) {
char buffer[BUFFER_SIZE];
int32_t iDataNum;
socklen_t sin_size;
int32_t bufSize = 1024000;
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
uError("failed to create udp socket since %s", strerror(errno));
uError("failed to create UDP socket since %s", strerror(errno));
return NULL;
}
......@@ -58,11 +59,23 @@ static void *taosNetBindUdpPort(void *sarg) {
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
uError("failed to bind udp port:%d since %s", port, strerror(errno));
uError("failed to bind UDP port:%d since %s", port, strerror(errno));
return NULL;
}
uInfo("udp server at port:%d is listening", port);
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(serverSocket);
return NULL;
}
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(serverSocket);
return NULL;
}
uInfo("UDP server at port:%d is listening", port);
while (1) {
memset(buffer, 0, BUFFER_SIZE);
......@@ -74,10 +87,13 @@ static void *taosNetBindUdpPort(void *sarg) {
continue;
}
uInfo("UDP: recv:%d bytes from %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
if (iDataNum > 0) {
uInfo("UDP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
iDataNum = taosSendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
}
uInfo("UDP: send:%d bytes to %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
}
taosCloseSocket(serverSocket);
......@@ -94,10 +110,9 @@ static void *taosNetBindTcpPort(void *sarg) {
int32_t addr_len = sizeof(clientAddr);
SOCKET client;
char buffer[BUFFER_SIZE];
int32_t iDataNum = 0;
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
uError("failed to create tcp socket since %s", strerror(errno));
uError("failed to create TCP socket since %s", strerror(errno));
return NULL;
}
......@@ -106,130 +121,103 @@ static void *taosNetBindTcpPort(void *sarg) {
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int32_t reuse = 1;
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
}
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
uError("failed to bind tcp port:%d since %s", port, strerror(errno));
uError("failed to bind TCP port:%d since %s", port, strerror(errno));
return NULL;
}
if (listen(serverSocket, 5) < 0) {
uError("failed to listen tcp port:%d since %s", port, strerror(errno));
if (taosKeepTcpAlive(serverSocket) < 0) {
uError("failed to set tcp server keep-alive option since %s", strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
}
uInfo("tcp server at port:%d is listening", port);
if (listen(serverSocket, 10) < 0) {
uError("failed to listen TCP port:%d since %s", port, strerror(errno));
return NULL;
}
uInfo("TCP server at port:%d is listening", port);
while (1) {
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
if (client < 0) {
uDebug("failed to accept from tcp port:%d since %s", port, strerror(errno));
uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno));
continue;
}
iDataNum = 0;
memset(buffer, 0, BUFFER_SIZE);
int32_t nleft, nread;
char * ptr = buffer;
nleft = pinfo->pktLen;
while (nleft > 0) {
nread = recv(client, ptr, BUFFER_SIZE, 0);
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR) {
continue;
} else {
uError("failed to perform recv func at %d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
}
} else {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
int32_t ret = taosReadMsg(client, buffer, pinfo->pktLen);
if (ret < 0 || ret != pinfo->pktLen) {
uError("TCP: failed to read %d bytes at port:%d since %s", pinfo->pktLen, port, strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
}
if (iDataNum > 0) {
uInfo("TCP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
send(client, buffer, iDataNum, 0);
uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
ret = taosWriteMsg(client, buffer, pinfo->pktLen);
if (ret < 0) {
uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, port, strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
}
uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
}
taosCloseSocket(serverSocket);
return NULL;
}
static int32_t taosNetCheckTcpPort(STestInfo *info) {
SOCKET clientSocket;
char sendbuf[BUFFER_SIZE];
char recvbuf[BUFFER_SIZE];
int32_t iDataNum = 0;
SOCKET clientSocket;
char buffer[BUFFER_SIZE] = {0};
struct sockaddr_in serverAddr;
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
uError("failed to create tcp client socket since %s", strerror(errno));
uError("failed to create TCP client socket since %s", strerror(errno));
return -1;
}
// set send and recv overtime
struct timeval timeout;
timeout.tv_sec = 2; // s
timeout.tv_usec = 0; // us
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt send timer since %s", strerror(errno));
}
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt recv timer since %s", strerror(errno));
int32_t reuse = 1;
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(clientSocket);
return -1;
}
struct sockaddr_in serverAddr;
memset((char *)&serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port);
serverAddr.sin_port = (uint16_t)htons((uint16_t)info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
uError("failed to connect port:%d since %s", info->port, strerror(errno));
uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
return -1;
}
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4);
sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
send(clientSocket, sendbuf, info->pktLen, 0);
taosKeepTcpAlive(clientSocket);
memset(recvbuf, 0, BUFFER_SIZE);
int32_t nleft, nread;
char * ptr = recvbuf;
nleft = info->pktLen;
sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port);
sprintf(buffer + info->pktLen - 16, "1122334455667788");
while (nleft > 0) {
nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);;
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR) {
continue;
} else {
uError("faild to recv pkg from TCP port:%d since %s", info->port, strerror(errno));
taosCloseSocket(clientSocket);
return -1;
}
} else {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
int32_t ret = taosWriteMsg(clientSocket, buffer, info->pktLen);
if (ret < 0) {
uError("TCP: failed to write msg to %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
return -1;
}
if (iDataNum < info->pktLen) {
uError("TCP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port);
ret = taosReadMsg(clientSocket, buffer, info->pktLen);
if (ret < 0) {
uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
return -1;
}
......@@ -239,9 +227,9 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) {
static int32_t taosNetCheckUdpPort(STestInfo *info) {
SOCKET clientSocket;
char sendbuf[BUFFER_SIZE];
char recvbuf[BUFFER_SIZE];
char buffer[BUFFER_SIZE] = {0};
int32_t iDataNum = 0;
int32_t bufSize = 1024000;
struct sockaddr_in serverAddr;
......@@ -250,41 +238,39 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) {
return -1;
}
// set overtime
struct timeval timeout;
timeout.tv_sec = 2; // s
timeout.tv_usec = 0; // us
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt send timer since %s", strerror(errno));
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
return -1;
}
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt recv timer since %s", strerror(errno));
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
return -1;
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4);
sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
sprintf(buffer, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
sprintf(buffer + info->pktLen - 16, "1122334455667788");
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
int32_t code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
if (code < 0) {
uError("failed to perform sendto func since %s", strerror(errno));
iDataNum = taosSendto(clientSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
if (iDataNum < 0 || iDataNum != info->pktLen) {
uError("UDP: failed to perform sendto func since %s", strerror(errno));
return -1;
}
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
memset(buffer, 0, BUFFER_SIZE);
sin_size = sizeof(*(struct sockaddr *)&serverAddr);
iDataNum = recvfrom(clientSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
if (iDataNum < info->pktLen) {
uError("UDP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port);
if (iDataNum < 0 || iDataNum != info->pktLen) {
uError("UDP: received ack:%d bytes(expect:%d) from port:%d since %s", iDataNum, info->pktLen, info->port, strerror(errno));
return -1;
}
......@@ -304,19 +290,18 @@ static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort
info.port = port;
ret = taosNetCheckTcpPort(&info);
if (ret != 0) {
uError("failed to test tcp port:%d", port);
uError("failed to test TCP port:%d", port);
} else {
uInfo("successed to test tcp port:%d", port);
uInfo("successed to test TCP port:%d", port);
}
ret = taosNetCheckUdpPort(&info);
if (ret != 0) {
uError("failed to test udp port:%d", port);
uError("failed to test UDP port:%d", port);
} else {
uInfo("successed to test udp port:%d", port);
uInfo("successed to test UDP port:%d", port);
}
}
return;
}
void *taosNetInitRpc(char *secretEncrypt, char spi) {
......@@ -440,9 +425,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
if (ret < 0) {
uError("failed to test tcp port:%d", port);
uError("failed to test TCP port:%d", port);
} else {
uInfo("successed to test tcp port:%d", port);
uInfo("successed to test TCP port:%d", port);
}
if (pkgLen >= tsRpcMaxUdpSize) {
......@@ -453,9 +438,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
if (ret < 0) {
uError("failed to test udp port:%d", port);
uError("failed to test UDP port:%d", port);
} else {
uInfo("successed to test udp port:%d", port);
uInfo("successed to test UDP port:%d", port);
}
}
}
......@@ -492,14 +477,15 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
tcpInfo->pktLen = pkgLen;
if (pthread_create(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) {
uInfo("failed to create tcp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
uInfo("failed to create TCP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
exit(-1);
}
STestInfo *udpInfo = uinfos + i;
udpInfo->port = (uint16_t)(port + i);
udpInfo->port = port + i;
tcpInfo->pktLen = pkgLen;
if (pthread_create(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) {
uInfo("failed to create udp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
uInfo("failed to create UDP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
exit(-1);
}
}
......
......@@ -228,7 +228,7 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN
}
void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len) {
if (pNote->fd < 0) return;
if (pNote->fd <= 0) return;
taosWrite(pNote->fd, buffer, len);
if (pNote->maxLines > 0) {
......
......@@ -441,7 +441,6 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
if (status == TSDB_STATUS_COMMIT_START) {
pVnode->isCommiting = 1;
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (!vnodeInInitStatus(pVnode)) {
return walRenew(pVnode->wal);
......@@ -450,9 +449,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
}
if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
pVnode->isCommiting = 0;
pVnode->isFull = 0;
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (!vnodeInInitStatus(pVnode)) {
walRemoveOneOldFile(pVnode->wal);
}
......
......@@ -166,14 +166,14 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
char walName[WAL_FILE_LEN];
snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
if (code != TSDB_CODE_SUCCESS) {
wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
continue;
}
wDebug("vgId:%d, file:%s, restore success", pWal->vgId, walName);
wInfo("vgId:%d, file:%s, restore success", pWal->vgId, walName);
count++;
}
......@@ -326,7 +326,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
offset = offset + sizeof(SWalHead) + pHead->len;
wDebug("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
fileId, pHead->version, pWal->version, pHead->len);
pWal->version = pHead->version;
......
......@@ -38,11 +38,16 @@
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.17</version>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.11</version>
<version>2.0.14</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
......
spring:
datasource:
# driver-class-name: org.h2.Driver
# schema: classpath:db/schema-mysql.sql
# data: classpath:db/data-mysql.sql
# url: jdbc:h2:mem:test
# username: root
# password: test
# driver-class-name: com.mysql.jdbc.Driver
# url: jdbc:mysql://master:3306/test?useSSL=false
# username: root
# password: 123456
driver-class-name: com.taosdata.jdbc.TSDBDriver
url: jdbc:TAOS://localhost:6030/mp_test
user: root
......@@ -20,6 +8,12 @@ spring:
locale: en_US.UTF-8
timezone: UTC-8
druid:
initial-size: 5
min-idle: 5
max-active: 5
mybatis-plus:
configuration:
map-underscore-to-camel-case: false
......
......@@ -65,7 +65,18 @@ function runQueryPerfTest {
echoInfo "Run Performance Test"
cd $WORK_DIR/TDengine/tests/pytest
python3 query/queryPerformance.py 0 | tee -a $PERFORMANCE_TEST_REPORT
python3 query/queryPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
yes | taosdemo -c /etc/taosperf/ -d taosdemo_insert_test -t 1000 -n 1000 > taosdemoperf.txt
CREATETABLETIME=`grep 'Spent' taosdemoperf.txt | awk 'NR==1{print $2}'`
INSERTRECORDSTIME=`grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $2}'`
REQUESTSPERSECOND=`grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $13}'`
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT -t $CREATETABLETIME -i $INSERTRECORDSTIME -r $REQUESTSPERSECOND | tee -a $PERFORMANCE_TEST_REPORT
[ -f taosdemoperf.txt ] && rm taosdemoperf.txt
}
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
import taos
if __name__ == "__main__":
logSql = True
deployPath = ""
testCluster = False
valgrind = 0
print("start to execute %s" % __file__)
tdDnodes.init(deployPath)
tdDnodes.setTestCluster(testCluster)
tdDnodes.setValgrind(valgrind)
tdDnodes.stopAll()
tdDnodes.addSimExtraCfg("maxSQLLength", "1048576")
tdDnodes.deploy(1)
tdDnodes.start(1)
host = '127.0.0.1'
tdLog.info("Procedures for tdengine deployed in %s" % (host))
tdCases.logSql(logSql)
print('1')
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
tdSql.init(conn.cursor(), True)
print("==========step1")
print("create table ")
tdSql.execute("create database db")
tdSql.execute("use db")
tdSql.execute("create table t1 (ts timestamp, c1 int,c2 int ,c3 int)")
print("==========step2")
print("insert maxSQLLength data ")
data = 'insert into t1 values'
ts = 1604298064000
i = 0
while ((len(data)<(1024*1024)) & (i < 32767 - 1) ):
data += '(%s,%d,%d,%d)'%(ts+i,i%1000,i%1000,i%1000)
i+=1
tdSql.execute(data)
print("==========step4")
print("insert data batch larger than 32767 ")
i = 0
while ((len(data)<(1024*1024)) & (i < 32767) ):
data += '(%s,%d,%d,%d)'%(ts+i,i%1000,i%1000,i%1000)
i+=1
tdSql.error(data)
print("==========step4")
print("insert data larger than maxSQLLength ")
tdSql.execute("create table t2 (ts timestamp, c1 binary(50))")
data = 'insert into t2 values'
i = 0
while ((len(data)<(1024*1024)) & (i < 32767 - 1 ) ):
data += '(%s,%s)'%(ts+i,'a'*50)
i+=1
tdSql.error(data)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......@@ -20,13 +20,23 @@ python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f insert/insertIntoTwoTables.py
python3 ./test.py -f insert/before_1970.py
python3 bug2265.py
#table
python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py
python3 ./test.py -f table/max_table_length.py
python3 ./test.py -f table/alter_column.py
python3 ./test.py -f table/boundary.py
python3 ./test.py -f table/create-a-lot.py
python3 ./test.py -f table/create.py
python3 ./test.py -f table/del_stable.py
python3 ./test.py -f table/queryWithTaosdKilled.py
# tag
python3 ./test.py -f tag_lite/filter.py
......@@ -135,9 +145,6 @@ python3 ./test.py -f user/pass_len.py
# stable
python3 ./test.py -f stable/query_after_reset.py
# table
python3 ./test.py -f table/del_stable.py
#query
python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterCombo.py
......@@ -164,7 +171,8 @@ python3 ./test.py -f query/bug2117.py
python3 ./test.py -f query/bug2143.py
python3 ./test.py -f query/sliding.py
python3 ./test.py -f query/unionAllTest.py
python3 ./test.py -f query/bug2281.py
python3 ./test.py -f query/bug2119.py
#stream
python3 ./test.py -f stream/metric_1.py
python3 ./test.py -f stream/new.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import time
import datetime
import csv
import random
import pandas as pd
import argparse
import os.path
class insertFromCSVPerformace:
def __init__(self, commitID, dbName, stbName, branchName):
self.commitID = commitID
self.dbName = dbName
self.stbName = stbName
self.branchName = branchName
self.ts = 1500074556514
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taosperf"
self.conn = taos.connect(
self.host,
self.user,
self.password,
self.config)
def writeCSV(self):
with open('test3.csv','w', encoding='utf-8', newline='') as csvFile:
writer = csv.writer(csvFile, dialect='excel')
for i in range(1000000):
newTimestamp = self.ts + random.randint(10000000, 10000000000) + random.randint(1000, 10000000) + random.randint(1, 1000)
d = datetime.datetime.fromtimestamp(newTimestamp / 1000)
dt = str(d.strftime("%Y-%m-%d %H:%M:%S.%f"))
writer.writerow(["'%s'" % dt, random.randint(1, 100), random.uniform(1, 100), random.randint(1, 100), random.randint(1, 100)])
def removCSVHeader(self):
data = pd.read_csv("ordered.csv")
data = data.drop([0])
data.to_csv("ordered.csv", header = False, index = False)
def createTables(self):
cursor = self.conn.cursor()
cursor.execute("create database if not exists %s" % self.dbName)
cursor.execute("use %s" % self.dbName)
cursor.execute("create table if not exists %s(ts timestamp, in_order_time float, out_of_order_time float, commit_id binary(50)) tags(branch binary(50))" % self.stbName)
cursor.execute("create table if not exists %s using %s tags('%s')" % (self.branchName, self.stbName, self.branchName))
cursor.execute("create table if not exists t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
cursor.execute("create table if not exists t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
cursor.close()
def run(self):
cursor = self.conn.cursor()
cursor.execute("use %s" % self.dbName)
print("==================== CSV insert performance ====================")
totalTime = 0
for i in range(10):
cursor.execute("create table if not exists t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
startTime = time.time()
cursor.execute("insert into t1 file 'outoforder.csv'")
totalTime += time.time() - startTime
cursor.execute("drop table if exists t1")
out_of_order_time = (float) (totalTime / 10)
print("Out of Order - Insert time: %f" % out_of_order_time)
totalTime = 0
for i in range(10):
cursor.execute("create table if not exists t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
startTime = time.time()
cursor.execute("insert into t2 file 'ordered.csv'")
totalTime += time.time() - startTime
cursor.execute("drop table if exists t2")
in_order_time = (float) (totalTime / 10)
print("In order - Insert time: %f" % in_order_time)
cursor.execute("insert into %s values(now, %f, %f, '%s')" % (self.branchName, in_order_time, out_of_order_time, self.commitID))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'-c',
'--commit-id',
action='store',
default='null',
type=str,
help='git commit id (default: null)')
parser.add_argument(
'-d',
'--database-name',
action='store',
default='perf',
type=str,
help='Database name to be created (default: perf)')
parser.add_argument(
'-t',
'--stable-name',
action='store',
default='csv_insert',
type=str,
help='Database name to be created (default: csv_insert)')
parser.add_argument(
'-b',
'--branch-name',
action='store',
default='develop',
type=str,
help='branch name (default: develop)')
args = parser.parse_args()
perftest = insertFromCSVPerformace(args.commit_id, args.database_name, args.stable_name, args.branch_name)
perftest.createTables()
perftest.run()
\ No newline at end of file
......@@ -20,12 +20,20 @@ python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f insert/insertIntoTwoTables.py
#table
python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py
python3 ./test.py -f table/max_table_length.py
python3 ./test.py -f table/alter_column.py
python3 ./test.py -f table/boundary.py
python3 ./test.py -f table/create-a-lot.py
python3 ./test.py -f table/create.py
python3 ./test.py -f table/del_stable.py
python3 ./test.py -f table/queryWithTaosdKilled.py
# tag
python3 ./test.py -f tag_lite/filter.py
......@@ -134,9 +142,6 @@ python3 ./test.py -f user/pass_len.py
# stable
python3 ./test.py -f stable/query_after_reset.py
# table
python3 ./test.py -f table/del_stable.py
#query
python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterCombo.py
......@@ -159,7 +164,9 @@ python3 ./test.py -f query/bug1874.py
python3 ./test.py -f query/bug1875.py
python3 ./test.py -f query/bug1876.py
python3 ./test.py -f query/bug2218.py
python3 ./test.py -f query/bug2281.py
python3 ./test.py -f query/bug2119.py
python3 bug2265.py
#stream
python3 ./test.py -f stream/metric_1.py
python3 ./test.py -f stream/new.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==========step1")
print("create table && insert data")
tdSql.execute("create table t1 (ts timestamp, c1 int, c2 float)")
print("==========step2")
print("query percentile from blank table")
tdSql.query('select percentile(c1,1) from t1')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==========step1")
print("create table && insert data")
tdSql.execute("create table t1 (ts timestamp, c1 int, c2 float)")
insertRows = 10
t0 = 1604298064000
tdLog.info("insert %d rows" % (insertRows))
for i in range(insertRows):
ret = tdSql.execute(
"insert into t1 values (%d , %d,%d)" %
(t0+i,i%100,i/2.0))
print("==========step2")
print("query diff && top")
tdSql.error('select diff(c1),top(c2) from t1')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
......@@ -16,10 +16,16 @@ import sys
import os
import taos
import time
import argparse
class taosdemoQueryPerformace:
def initConnection(self):
def __init__(self, clearCache, commitID, dbName, stbName, tbPerfix):
self.clearCache = clearCache
self.commitID = commitID
self.dbName = dbName
self.stbName = stbName
self.tbPerfix = tbPerfix
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
......@@ -30,92 +36,109 @@ class taosdemoQueryPerformace:
self.password,
self.config)
def createPerfTables(self):
cursor = self.conn.cursor()
cursor.execute("create database if not exists %s" % self.dbName)
cursor.execute("use %s" % self.dbName)
cursor.execute("create table if not exists %s(ts timestamp, query_time float, commit_id binary(50)) tags(query_id int, query_sql binary(300))" % self.stbName)
sql = "select count(*) from test.meters"
tableid = 1
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select avg(f1), max(f2), min(f3) from test.meters"
tableid = 2
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select count(*) from test.meters where loc='beijing'"
tableid = 3
cursor.execute("create table if not exists %s%d using %s tags(%d, \"%s\")" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select avg(f1), max(f2), min(f3) from test.meters where areaid=10"
tableid = 4
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select avg(f1), max(f2), min(f3) from test.t10 interval(10s)"
tableid = 5
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select last_row(*) from meters"
tableid = 6
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select * from meters"
tableid = 7
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
sql = "select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000'"
tableid = 8
cursor.execute("create table if not exists %s%d using %s tags(%d, \"%s\")" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
cursor.close()
def query(self):
cursor = self.conn.cursor()
cursor.execute("use test")
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select count(*) from test.meters")
totalTime += time.time() - startTime
print("query time for: select count(*) from test.meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from test.meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select count(*) from test.meters where loc='beijing'")
totalTime += time.time() - startTime
print("query time for: select count(*) from test.meters where loc='beijing' %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters where areaid=10")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from test.meters where areaid=10 %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from test.t10 interval(10s)")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from test.t10 interval(10s) %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select last_row(*) from meters")
totalTime += time.time() - startTime
print("query time for: select last_row(*) from meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select * from meters")
totalTime += time.time() - startTime
print("query time for: select * from meters %f seconds" % (totalTime / 100))
totalTime = 0
for i in range(100):
if(sys.argv[1] == '1'):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor.execute("select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000'")
totalTime += time.time() - startTime
print("query time for: select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000' %f seconds" % (totalTime / 100))
cursor = self.conn.cursor()
print("==================== query performance ====================")
cursor.execute("use %s" % self.dbName)
cursor.execute("select tbname, query_id, query_sql from %s" % self.stbName)
for data in cursor:
table_name = data[0]
query_id = data[1]
sql = data[2]
totalTime = 0
cursor2 = self.conn.cursor()
cursor2.execute("use test")
for i in range(100):
if(self.clearCache == True):
# root permission is required
os.system("echo 3 > /proc/sys/vm/drop_caches")
startTime = time.time()
cursor2.execute(sql)
totalTime += time.time() - startTime
cursor2.close()
print("query time for: %s %f seconds" % (sql, totalTime / 100))
cursor3 = self.conn.cursor()
cursor3.execute("insert into %s.%s values(now, %f, '%s')" % (self.dbName, table_name, totalTime / 100, self.commitID))
cursor3.close()
cursor.close()
if __name__ == '__main__':
perftest = taosdemoQueryPerformace()
perftest.initConnection()
perftest.query()
\ No newline at end of file
parser = argparse.ArgumentParser()
parser.add_argument(
'-r',
'--remove-cache',
action='store_true',
default=False,
help='clear cache before query (default: False)')
parser.add_argument(
'-c',
'--commit-id',
action='store',
default='null',
type=str,
help='git commit id (default: null)')
parser.add_argument(
'-d',
'--database-name',
action='store',
default='perf',
type=str,
help='Database name to be created (default: perf)')
parser.add_argument(
'-t',
'--stable-name',
action='store',
default='query_tb',
type=str,
help='table name to be created (default: query_tb)')
parser.add_argument(
'-p',
'--table-perfix',
action='store',
default='q',
type=str,
help='table name perfix (default: q)')
args = parser.parse_args()
perftest = taosdemoQueryPerformace(args.remove_cache, args.commit_id, args.database_name, args.stable_name, args.table_perfix)
perftest.createPerfTables()
perftest.query()
......@@ -52,7 +52,7 @@ class TDTestCase:
tdSql.checkRows(5)
sql = ''' select * from st where loc = 'nchar0' limit 1 union all select * from st where loc = 'nchar1' limit 1 union all select * from st where loc = 'nchar2' limit 1
union all select * from st where loc = 'nchar3' limit 1 union all select * from st where loc = 'nchar4' limit 1 union all select * from st where loc = 'nchar5''''
union all select * from st where loc = 'nchar3' limit 1 union all select * from st where loc = 'nchar4' limit 1 union all select * from st where loc = 'nchar5' limit 1'''
tdSql.query(sql)
tdSql.checkRows(6)
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute("create table db.cars(ts timestamp, c int) tags(id int);")
tdSql.execute("create database db2")
tdSql.error("create table db2.car1 using db.cars tags(1)")
tdSql.error("insert into db2.car1 using db1.cars tags(1) values(now, 1);")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, db_test.stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==============step1")
tdLog.info("check nchar")
tdSql.error("create database anal (ts timestamp ,i nchar(4094))")
tdSql.execute(
"create table anal (ts timestamp ,i nchar(4093))")
print("==============step2")
tdLog.info("check binary")
tdSql.error("create database anal (ts timestamp ,i binary(16375))")
tdSql.execute(
"create table anal1 (ts timestamp ,i binary(16374))")
print("==============step3")
tdLog.info("check int & binary")
tdSql.error("create table anal2 (ts timestamp ,i binary(16371),j int)")
tdSql.execute("create table anal2 (ts timestamp ,i binary(16370),j int)")
tdSql.execute("create table anal3 (ts timestamp ,i binary(16366), j int, k int)")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import time
import datetime
import csv
import random
import pandas as pd
import argparse
import os.path
class taosdemoPerformace:
def __init__(self, commitID, dbName, createTableTime, insertRecordsTime, recordsPerSecond):
self.commitID = commitID
self.dbName = dbName
self.createTableTime = createTableTime
self.insertRecordsTime = insertRecordsTime
self.recordsPerSecond = recordsPerSecond
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taosperf"
self.conn = taos.connect(
self.host,
self.user,
self.password,
self.config)
def createTablesAndStoreData(self):
cursor = self.conn.cursor()
cursor.execute("create database if not exists %s" % self.dbName)
cursor.execute("use %s" % self.dbName)
cursor.execute("create table if not exists taosdemo_perf (ts timestamp, create_table_time float, insert_records_time float, records_per_second float, commit_id binary(50))")
print("==================== taosdemo performance ====================")
print("create tables time: %f" % self.createTableTime)
print("insert records time: %f" % self.insertRecordsTime)
print("records per second: %f" % self.recordsPerSecond)
cursor.execute("insert into taosdemo_perf values(now, %f, %f, %f, '%s')" % (self.createTableTime, self.insertRecordsTime, self.recordsPerSecond, self.commitID))
cursor.execute("drop database if exists taosdemo_insert_test")
cursor.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'-c',
'--commit-id',
action='store',
type=str,
help='git commit id (default: null)')
parser.add_argument(
'-d',
'--database-name',
action='store',
default='perf',
type=str,
help='Database name to be created (default: perf)')
parser.add_argument(
'-t',
'--create-table',
action='store',
type=float,
help='create table time')
parser.add_argument(
'-i',
'--insert-records',
action='store',
type=float,
help='insert records time')
parser.add_argument(
'-r',
'---records-per-second',
action='store',
type=float,
help='records per request')
args = parser.parse_args()
perftest = taosdemoPerformace(args.commit_id, args.database_name, args.create_table, args.insert_records, args.records_per_second)
perftest.createTablesAndStoreData()
\ No newline at end of file
......@@ -24,15 +24,17 @@
#define GREEN "\033[1;32m"
#define NC "\033[0m"
int32_t capacity = 100000;
int32_t q1Times = 1;
int32_t q2Times = 1;
int32_t capacity = 128;
int32_t q1Times = 10;
int32_t q2Times = 10;
int32_t keyNum = 100000;
int32_t printInterval = 10000;
int32_t printInterval = 1000;
void * hashHandle;
pthread_t thread;
typedef struct HashTestRow {
int32_t size;
void * ptr;
int32_t keySize;
char key[100];
} HashTestRow;
void shellParseArgument(int argc, char *argv[]);
......@@ -40,7 +42,7 @@ void shellParseArgument(int argc, char *argv[]);
void testHashPerformance() {
int64_t initialMs = taosGetTimestampMs();
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
void * hashHandle = taosHashInit(capacity, hashFp, true);
hashHandle = taosHashInit(128, hashFp, true, HASH_NO_LOCK);
int64_t startMs = taosGetTimestampMs();
float seconds = (startMs - initialMs) / 1000.0;
......@@ -48,17 +50,25 @@ void testHashPerformance() {
for (int32_t t = 1; t <= keyNum; ++t) {
HashTestRow row = {0};
char key[100] = {0};
int32_t keySize = sprintf(key, "0.db.st%d", t);
row.keySize = sprintf(row.key, "0.db.st%d", t);
for (int32_t q = 0; q < q1Times; q++) {
taosHashGet(hashHandle, &key, keySize);
taosHashGet(hashHandle, row.key, row.keySize);
}
taosHashPut(hashHandle, key, keySize, &row, sizeof(HashTestRow));
taosHashPut(hashHandle, row.key, row.keySize, &row, sizeof(HashTestRow));
for (int32_t q = 0; q < q2Times; q++) {
taosHashGet(hashHandle, &key, keySize);
taosHashGet(hashHandle, row.key, row.keySize);
}
// test iterator
{
HashTestRow *row = taosHashIterate(hashHandle, NULL);
while (row) {
taosHashGet(hashHandle, row->key, row->keySize);
row = taosHashIterate(hashHandle, row);
}
}
if (t % printInterval == 0) {
......@@ -80,9 +90,35 @@ void testHashPerformance() {
taosHashCleanup(hashHandle);
}
void *multiThreadFunc(void *param) {
for (int i = 0; i < 100; ++i) {
taosMsleep(1000);
HashTestRow *row = taosHashIterate(hashHandle, NULL);
while (row) {
taosHashGet(hashHandle, row->key, row->keySize);
row = taosHashIterate(hashHandle, row);
}
int64_t hashSize = taosHashGetSize(hashHandle);
pPrint("i:%d hashSize:%ld", i, hashSize);
}
return NULL;
}
void multiThreadTest() {
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
// Start threads to write
pthread_create(&thread, &thattr, multiThreadFunc, NULL);
}
int main(int argc, char *argv[]) {
shellParseArgument(argc, argv);
multiThreadTest();
testHashPerformance();
pthread_join(thread, NULL);
}
void printHelp() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册