提交 636aa8d2 编写于 作者: S Shengliang Guan

Merge branch 'develop' into feature/query

此差异已折叠。
......@@ -148,7 +148,7 @@ void taos_init_imp() {
refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscCacheHandle == NULL) {
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL);
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "client");
}
tscDebug("client is initialized successfully");
......
......@@ -213,6 +213,8 @@ void cqDrop(void *handle) {
pObj->pStream = NULL;
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->sqlStr);
free(pObj);
pthread_mutex_unlock(&pContext->mutex);
......
......@@ -41,7 +41,7 @@ int32_t mnodeInitProfile();
void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port);
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port);
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
......
......@@ -43,7 +43,7 @@
extern void *tsMnodeTmr;
static SCacheObj *tsMnodeConnCache = NULL;
static uint32_t tsConnIndex = 0;
static int32_t tsConnIndex = 0;
static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, CONN_CHECK_TIME,false, mnodeFreeConn);
tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn");
return 0;
}
......@@ -89,7 +89,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
return NULL;
}
uint32_t connId = atomic_add_fetch_32(&tsConnIndex, 1);
int32_t connId = atomic_add_fetch_32(&tsConnIndex, 1);
if (connId == 0) atomic_add_fetch_32(&tsConnIndex, 1);
SConnObj connObj = {
......@@ -100,9 +100,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
};
tstrncpy(connObj.user, user, sizeof(connObj.user));
char key[10];
int32_t len = sprintf(key, "%u", connId);
SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, len, &connObj, sizeof(connObj), CONN_KEEP_TIME);
SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME);
mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return pConn;
......@@ -113,12 +111,9 @@ void mnodeReleaseConn(SConnObj *pConn) {
taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
}
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
char key[10];
int32_t len = sprintf(key, "%u", connId);
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) {
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, len, expireTime);
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, &connId, sizeof(int32_t), expireTime);
if (pConn == NULL) {
mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return NULL;
......@@ -547,7 +542,8 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) {
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, connIdStr, strlen(connIdStr));
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId);
return TSDB_CODE_MND_INVALID_CONN_ID;
......@@ -576,8 +572,9 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) {
}
int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10);
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, connIdStr, strlen(connIdStr));
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId);
return TSDB_CODE_MND_INVALID_CONN_ID;
......@@ -594,7 +591,8 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont;
SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, pKill->queryId, strlen(pKill->queryId));
int32_t connId = atoi(pKill->queryId);
SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
return TSDB_CODE_MND_INVALID_CONN_ID;
......
......@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, mnodeFreeShowObj);
tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show");
return 0;
}
......@@ -364,10 +364,7 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
}
static bool mnodeAccquireShowObj(SShowObj *pShow) {
char key[10];
int32_t len = sprintf(key, "%d", pShow->index);
SShowObj *pSaved = taosCacheAcquireByKey(tsMnodeShowCache, key, len);
SShowObj *pSaved = taosCacheAcquireByKey(tsMnodeShowCache, &pShow->index, sizeof(int32_t));
if (pSaved == pShow) {
mDebug("%p, show is accquired from cache", pShow);
return true;
......@@ -378,14 +375,11 @@ static bool mnodeAccquireShowObj(SShowObj *pShow) {
static void *mnodePutShowObj(SShowObj *pShow, int32_t size) {
if (tsMnodeShowCache != NULL) {
char key[10];
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
int32_t len = sprintf(key, "%d", pShow->index);
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, len, pShow, size, 6);
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, &pShow->index, sizeof(int32_t), pShow, size, 6);
free(pShow);
mDebug("%p, show is put into cache, index:%s", newQhandle, key);
mDebug("%p, show is put into cache, index:%d", newQhandle, pShow->index);
return newQhandle;
}
......
......@@ -53,12 +53,12 @@ static void httpDestroyContext(void *data) {
httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext);
httpDebug("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount);
httpDebug("context:%p, is destroyed, refCount:%d data:%p", pContext, pContext->refCount, data);
tfree(pContext);
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 2, false, httpDestroyContext);
tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
......@@ -103,17 +103,14 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL;
char contextStr[16] = {0};
int32_t keySize = snprintf(contextStr, sizeof(contextStr), "%p", pContext);
pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, keySize, &pContext, sizeof(HttpContext *), 3);
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(void *), &pContext, sizeof(void *), 3);
pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext);
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
// set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
......@@ -122,16 +119,13 @@ HttpContext *httpCreateContext(int32_t fd) {
}
HttpContext *httpGetContext(void *ptr) {
char contextStr[16] = {0};
int32_t len = snprintf(contextStr, sizeof(contextStr), "%p", ptr);
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, contextStr, len);
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &ptr, sizeof(HttpContext *));
if (ppContext) {
HttpContext *pContext = *ppContext;
if (pContext) {
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpDebug("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount);
httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount);
return pContext;
}
}
......@@ -141,9 +135,10 @@ HttpContext *httpGetContext(void *ptr) {
void httpReleaseContext(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0);
httpDebug("context:%p, is releasd, refCount:%d", pContext, refCount);
HttpContext **ppContext = pContext->ppContext;
httpDebug("context:%p, is releasd, data:%p refCount:%d", pContext, ppContext, refCount);
if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else {
......
......@@ -85,6 +85,7 @@ bool httpReadDataImp(HttpContext *pContext) {
} else {
httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect",
pContext, pContext->fd, pContext->ipstr, errno);
httpReleaseContext(pContext);
return false;
}
} else {
......@@ -153,6 +154,7 @@ static bool httpReadData(HttpContext *pContext) {
int ret = httpCheckReadCompleted(pContext);
if (ret == HTTP_CHECK_BODY_CONTINUE) {
//httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
httpReleaseContext(pContext);
return false;
} else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
......@@ -161,11 +163,13 @@ static bool httpReadData(HttpContext *pContext) {
return true;
} else {
httpNotifyContextClose(pContext);
httpReleaseContext(pContext);
return false;
}
} else {
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
httpNotifyContextClose(pContext);
httpReleaseContext(pContext);
return false;
}
}
......
......@@ -115,7 +115,7 @@ void httpCleanUpSessions() {
}
bool httpInitSessions() {
tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession);
tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests");
if (tsHttpServer.sessionCache == NULL) {
httpError("failed to init session cache");
return false;
......
......@@ -65,6 +65,7 @@ typedef struct {
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime;
STrashElem * pTrash;
const char * cacheName;
// void * tmrCtrl;
// void * pTimer;
SCacheStatis statistics;
......@@ -90,7 +91,7 @@ typedef struct {
* @param fn free resource callback function
* @return
*/
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn);
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char *cacheName);
/**
* initialize the cache object and set the free object callback function
......@@ -98,7 +99,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
* @param freeCb
* @return
*/
SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn);
SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char *cacheName);
/**
* add data into cache
......@@ -128,7 +129,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
* @param expireTime new expire time of data
* @return
*/
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, size_t keyLen, uint64_t expireTime);
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime);
/**
* Add one reference count for the exist data, and assign this data for a new owner.
......
......@@ -118,8 +118,10 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = pNode->size;
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
uDebug("key:%p, is removed from cache,total:%" PRId64 ",size:%dbytes", pNode->key, pCacheObj->totalSize, size);
uDebug("key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes, cacheName:%s",
pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size,
pCacheObj->cacheName);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode);
}
......@@ -224,7 +226,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
*/
static void* taosCacheRefresh(void *handle);
SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn) {
SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
if (refreshTimeInSeconds <= 0) {
return NULL;
}
......@@ -236,6 +238,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
}
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
pCacheObj->cacheName = cacheName;
if (pCacheObj->pHashTable == NULL) {
free(pCacheObj);
uError("failed to allocate memory, reason:%s", strerror(errno));
......@@ -265,8 +268,8 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
return pCacheObj;
}
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn) {
return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn);
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn, cacheName);
}
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
......@@ -284,19 +287,21 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L);
if (NULL != pNode) {
pCacheObj->totalSize += pNode->size;
uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes",
key, pNode, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), pCacheObj->totalSize, dataSize);
uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
"bytes size:%" PRId64 "bytes, cacheName:%s",
key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime),
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize, pCacheObj->cacheName);
} else {
uError("key:%p, failed to added into cache, out of memory", key);
uError("key:%p, failed to added into cache, out of memory, cacheName:%s", key, pCacheObj->cacheName);
}
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
uDebug("key:%p, %p exist in cache, updated", key, pNode);
uDebug("key:%p, %p exist in cache, updated, cacheName:%s", key, pNode->data, pCacheObj->cacheName);
}
__cache_unlock(pCacheObj);
return (pNode != NULL) ? pNode->data : NULL;
}
......@@ -327,17 +332,17 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("key:%p, is retrieved from cache, %p refcnt:%d", key, (*ptNode), ref);
uDebug("key:%p, %p is retrieved from cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data, ref, pCacheObj->cacheName);
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("key:%p, not in cache, retrieved failed", key);
uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, size_t keyLen, uint64_t expireTime) {
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
return NULL;
}
......@@ -350,17 +355,18 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, siz
(*ptNode)->extendFactor += 1;
// (*ptNode)->lifespan = expireTime;
}
__cache_unlock(pCacheObj);
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("key:%p, expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
uDebug("key:%p, %p expireTime is updated in cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data,
T_REF_VAL_GET(*ptNode), pCacheObj->cacheName);
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("key:%p, not in cache, retrieved failed", key);
uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
......@@ -375,9 +381,9 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
uError("key: %p the data from cache is invalid", ptNode);
return NULL;
}
int32_t ref = T_REF_INC(ptNode);
uDebug("%p acquired by data in cache, refcnt:%d", ptNode, ref)
uDebug("%p acquired by data in cache, refcnt:%d, cacheName:%s", ptNode->data, ref, pCacheObj->cacheName);
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if (pCacheObj->extendLifespan) {
......@@ -385,7 +391,8 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) {
ptNode->extendFactor += 1;
uDebug("key:%p extend life time to %"PRId64, ptNode, ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime);
uDebug("%p extend life time to %" PRId64, ptNode->data,
ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime);
}
}
......@@ -424,14 +431,14 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
if (pNode->signature != (uint64_t)pNode) {
uError("key:%p, release invalid cache data", pNode);
uError("%p, release invalid cache data", pNode);
return;
}
*data = NULL;
int16_t ref = T_REF_DEC(pNode);
uDebug("%p data released, refcnt:%d", pNode, ref);
uDebug("key:%p, %p is released, refcnt:%d, cacheName:%s", pNode->key, pNode->data, ref, pCacheObj->cacheName);
if (_remove && (!pNode->inTrashCan)) {
__cache_wr_lock(pCacheObj);
......@@ -474,6 +481,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
pCacheObj->deleting = 1;
pthread_join(pCacheObj->refreshWorker, NULL);
uInfo("cacheName:%p, will be cleanuped", pCacheObj->cacheName);
doCleanupDataCache(pCacheObj);
}
......@@ -522,7 +530,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pNode->inTrashCan = true;
pCacheObj->numOfElemsInTrash++;
uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
}
void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
......@@ -573,7 +581,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
}
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem;
......@@ -597,7 +605,8 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
if (T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
} else {
uDebug("key:%p, will not remove from cache, refcnt:%d", pNode->key, T_REF_VAL_GET(pNode));
uDebug("key:%p, %p will not remove from cache, refcnt:%d, cacheName:%s", pNode->key, pNode->data,
T_REF_VAL_GET(pNode), pCacheObj->cacheName);
}
}
taosHashDestroyIter(pIter);
......
......@@ -19,7 +19,7 @@ int32_t tsMaxMeterConnections = 200;
// test cache
TEST(testCase, client_cache_test) {
const int32_t REFRESH_TIME_IN_SEC = 2;
SCacheObj* tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL);
SCacheObj* tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL, "test");
const char* key1 = "test1";
char data1[] = "test11";
......@@ -105,7 +105,7 @@ TEST(testCase, client_cache_test) {
TEST(testCase, cache_resize_test) {
const int32_t REFRESH_TIME_IN_SEC = 2;
auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL);
auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL, "test");
char key[256] = {0};
char data[1024] = "abcdefghijk";
......
......@@ -284,7 +284,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
cqStart(pVnode->cq);
const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool
pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle);
pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle, "qhandle");
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
......
......@@ -22,7 +22,7 @@ class MetadataQuery:
def initConnection(self):
self.tables = 100000
self.records = 10
self.numOfTherads = 10
self.numOfTherads = 20
self.ts = 1537146000000
self.host = "127.0.0.1"
self.user = "root"
......@@ -55,10 +55,10 @@ class MetadataQuery:
def createTablesAndInsertData(self, threadID):
cursor = self.connectDB()
cursor.execute("use test")
base = threadID * self.tables
cursor.execute("use test")
tablesPerThread = int (self.tables / self.numOfTherads)
base = threadID * tablesPerThread
for i in range(tablesPerThread):
cursor.execute(
'''create table t%d using meters tags(
......@@ -75,12 +75,11 @@ class MetadataQuery:
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100))
for j in range(self.records):
cursor.execute(
"insert into t%d values(%d, %d)" %
(base + i + 1, self.ts + j, j))
cursor.close()
self.conn.close()
cursor.execute(
"insert into t%d values(%d, 1) (%d, 2) (%d, 3) (%d, 4) (%d, 5)" %
(base + i + 1, self.ts + 1, self.ts + 2, self.ts + 3, self.ts + 4, self.ts + 5))
cursor.close()
def queryData(self, query):
cursor = self.connectDB()
......@@ -108,12 +107,17 @@ if __name__ == '__main__':
print(
"================= Create %d tables and insert %d records into each table =================" %
(t.tables, t.records))
startTime = datetime.now()
startTime = datetime.now()
threads = []
for i in range(t.numOfTherads):
thread = threading.Thread(
target=t.createTablesAndInsertData, args=(i,))
thread.start()
thread.join()
threads.append(thread)
for th in threads:
th.join()
endTime = datetime.now()
diff = (endTime - startTime).seconds
print(
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import threading
import time
from datetime import datetime
import numpy as np
class MyThread(threading.Thread):
def __init__(self, func, args=()):
super(MyThread, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result # 如果子线程不使用join方法,此处可能会报没有self.result的错误
except Exception:
return None
class MetadataQuery:
def initConnection(self):
self.tables = 100
self.records = 10
self.numOfTherads =5
self.ts = 1537146000000
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taos"
self.conn = taos.connect( self.host, self.user, self.password, self.config)
def connectDB(self):
return self.conn.cursor()
def createStable(self):
print("================= Create stable meters =================")
cursor = self.connectDB()
cursor.execute("drop database if exists test")
cursor.execute("create database test")
cursor.execute("use test")
cursor.execute('''create table if not exists meters (ts timestamp, speed int) tags(
tgcol1 tinyint, tgcol2 smallint, tgcol3 int, tgcol4 bigint, tgcol5 float, tgcol6 double, tgcol7 bool, tgcol8 binary(20), tgcol9 nchar(20),
tgcol10 tinyint, tgcol11 smallint, tgcol12 int, tgcol13 bigint, tgcol14 float, tgcol15 double, tgcol16 bool, tgcol17 binary(20), tgcol18 nchar(20),
tgcol19 tinyint, tgcol20 smallint, tgcol21 int, tgcol22 bigint, tgcol23 float, tgcol24 double, tgcol25 bool, tgcol26 binary(20), tgcol27 nchar(20),
tgcol28 tinyint, tgcol29 smallint, tgcol30 int, tgcol31 bigint, tgcol32 float, tgcol33 double, tgcol34 bool, tgcol35 binary(20), tgcol36 nchar(20),
tgcol37 tinyint, tgcol38 smallint, tgcol39 int, tgcol40 bigint, tgcol41 float, tgcol42 double, tgcol43 bool, tgcol44 binary(20), tgcol45 nchar(20),
tgcol46 tinyint, tgcol47 smallint, tgcol48 int, tgcol49 bigint, tgcol50 float, tgcol51 double, tgcol52 bool, tgcol53 binary(20), tgcol54 nchar(20))''')
cursor.close()
def createTablesAndInsertData(self, threadID):
cursor = self.connectDB()
cursor.execute("use test")
base = threadID * self.tables
tablesPerThread = int (self.tables / self.numOfTherads)
for i in range(tablesPerThread):
cursor.execute(
'''create table t%d using meters tags(
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d')''' %
(base + i + 1,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100))
for j in range(self.records):
cursor.execute(
"insert into t%d values(%d, %d)" %
(base + i + 1, self.ts + j, j))
cursor.close()
def queryWithTagId(self, threadId, tagId, queryNum):
print("---------thread%d start-----------"%threadId)
query = '''select tgcol1, tgcol2, tgcol3, tgcol4, tgcol5, tgcol6, tgcol7, tgcol8, tgcol9,
tgcol10, tgcol11, tgcol12, tgcol13, tgcol14, tgcol15, tgcol16, tgcol17, tgcol18,
tgcol19, tgcol20, tgcol21, tgcol22, tgcol23, tgcol24, tgcol25, tgcol26, tgcol27,
tgcol28, tgcol29, tgcol30, tgcol31, tgcol32, tgcol33, tgcol34, tgcol35, tgcol36,
tgcol37, tgcol38, tgcol39, tgcol40, tgcol41, tgcol42, tgcol43, tgcol44, tgcol45,
tgcol46, tgcol47, tgcol48, tgcol49, tgcol50, tgcol51, tgcol52, tgcol53, tgcol54
from meters where tgcol{id} > {condition}'''
latancy = []
cursor = self.connectDB()
cursor.execute("use test")
for i in range(queryNum):
startTime = time.time()
cursor.execute(query.format(id = tagId, condition = i))
cursor.fetchall()
latancy.append((time.time() - startTime))
print("---------thread%d end-----------"%threadId)
return latancy
def queryData(self, query):
cursor = self.connectDB()
cursor.execute("use test")
print("================= query tag data =================")
startTime = datetime.now()
cursor.execute(query)
cursor.fetchall()
endTime = datetime.now()
print(
"Query time for the above query is %d seconds" %
(endTime - startTime).seconds)
cursor.close()
#self.conn.close()
if __name__ == '__main__':
t = MetadataQuery()
t.initConnection()
latancys = []
threads = []
tagId = 1
queryNum = 1000
for i in range(t.numOfTherads):
thread = MyThread(t.queryWithTagId, args = (i, tagId, queryNum))
threads.append(thread)
thread.start()
for i in range(t.numOfTherads):
threads[i].join()
latancys.extend(threads[i].get_result())
print("Total query: %d"%(queryNum * t.numOfTherads))
print("statistic(s): mean= %f, P50 = %f, P75 = %f, P95 = %f, P99 = %f"
%(sum(latancys)/(queryNum * t.numOfTherads), np.percentile(latancys, 50), np.percentile(latancys, 75), np.percentile(latancys, 95), np.percentile(latancys, 99)))
......@@ -137,6 +137,7 @@ python3 ./test.py -f query/filterFloatAndDouble.py
python3 ./test.py -f query/filterOtherTypes.py
python3 ./test.py -f query/queryError.py
python3 ./test.py -f query/querySort.py
python3 ./test.py -f query/queryJoin.py
#stream
python3 ./test.py -f stream/stream1.py
......
......@@ -114,10 +114,11 @@ echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "vDebugFlag 135" >> $TAOS_CFG
echo "tsdbDebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 131" >> $TAOS_CFG
echo "mqttDebugFlag 131" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG
......@@ -132,7 +133,7 @@ echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG
echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG
echo "defaultPass taosdata" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "numOfLogLines 10000000" >> $TAOS_CFG
echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG
echo "clog 2" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG
......
stress
stress.exe
cases.json
\ No newline at end of file
# STRESS
Stress test tool for TDengine. It run a set of test cases randomly and show statistics.
## COMMAND LINE
``` bash
$ ./stress [-h=<localhost>] [-P=<0>] [-d=<test>] [-u=<root>] [-p=<taosdata>] [-c=<4>] [-f=<true>] [-l=<logPath>] [path_or_sql]
```
* **-h**: host name or IP address of TDengine server (default: localhost).
* **-P**: port number of TDengine server (default: 0).
* **-u**: user name (default: root).
* **-p**: password (default: taosdata).
* **-c**: concurrency, number of concurrent goroutines for query (default: 4).
* **-f**: fetch data or not (default: true).
* **-l**: log file path (default: no log).
* **path_or_sql**: a SQL statement or path of a JSON file which contains the test cases (default: cases.json).
## TEST CASE FILE
```json
[{
"weight": 1,
"sql": "select * from meters where ts>=now+%dm and ts<=now-%dm and c1=%v and c2=%d and c3='%s' and tbname='%s'",
"args": [{
"type": "range",
"min": 30,
"max": 60
}, {
"type": "bool"
}, {
"type": "int",
"min": -10,
"max": 20
}, {
"type": "string",
"min": 0,
"max": 10,
}, {
"type": "list",
"list": [
"table1",
"table2",
"table3",
"table4"
]
}]
}]
```
The test case file is a standard JSON file which contains an array of test cases. For test cases, field `sql` is mandatory, and it can optionally include a `weight` field and an `args` field which is an array of arguments.
`sql` is a SQL statement, it can include zero or more arguments (placeholders).
`weight` defines the possibility of the case being selected, the greater value the higher possibility. It must be an non-negative integer and the default value is zero, but, if all cases have a zero weight, all the weights are regarded as 1.
Placeholders of `sql` are replaced by arguments in `args` at runtime. There are 5 types of arguments currently:
* **bool**: generate a `boolean` value randomly.
* **int**: generate an `integer` between [`min`, `max`] randomly, the default value of `min` is 0 and `max` is 100.
* **range**: generate two `integer`s between [`min`, `max`] randomly, the first is less than the second, the default value of `min` is 0 and `max` is 100.
* **string**: generate a `string` with length between [`min`, `max`] randomly, the default value of `min` is 0 and `max` is 100.
* **list**: select an item from `list` randomly.
## OUTPUT
```
00:00:08 | TOTAL REQ | TOTAL TIME(us) | TOTAL AVG(us) | REQUEST | TIME(us) | AVERAGE(us) |
TOTAL | 3027 | 26183890 | 8650.11 | 287 | 3060935 | 10665.28 |
SUCCESS | 3027 | 26183890 | 8650.11 | 287 | 3060935 | 10665.28 |
FAIL | 0 | 0 | 0.00 | 0 | 0 | 0.00 |
```
* **Col 2**: total number of request since test start.
* **Col 3**: total time of all request since test start.
* **Col 4**: average time of all request since test start.
* **Col 5**: number of request in last second.
* **Col 6**: time of all request in last second.
* **Col 7**: average time of all request in last second.
module github.com/taosdata/stress
go 1.14
require (
github.com/taosdata/driver-go v0.0.0-20200606095205-b786bac1857f
)
package main
import (
"database/sql"
"encoding/json"
"errors"
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"time"
_ "github.com/taosdata/driver-go/taosSql"
)
type argument struct {
Type string `json:"type"`
Min int `json:"min"`
Max int `json:"max"`
List []interface{} `json:"list, omitempty"`
}
type testCase struct {
isQuery bool `json:"-"`
numArgs int `json:"-"`
Weight int `json:"weight"`
SQL string `json:"sql"`
Args []argument `json:"args"`
}
func (arg *argument) check() (int, error) {
if arg.Type == "list" {
if len(arg.List) == 0 {
return 0, errors.New("list cannot be empty")
}
return 1, nil
}
if arg.Max < arg.Min {
return 0, errors.New("invalid min/max value")
}
if arg.Type == "string" {
if arg.Min < 0 {
return 0, errors.New("negative string length")
}
}
if arg.Type == "int" && arg.Min == 0 && arg.Max == 0 {
arg.Max = arg.Min + 100
}
if arg.Type == "range" {
return 2, nil
}
return 1, nil
}
func (arg *argument) generate(args []interface{}) []interface{} {
const chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
switch arg.Type {
case "bool":
if rand.Intn(2) == 1 {
args = append(args, true)
} else {
args = append(args, false)
}
case "int":
v := rand.Intn(arg.Max-arg.Min+1) + arg.Min
args = append(args, v)
case "range":
v := rand.Intn(arg.Max-arg.Min) + arg.Min
args = append(args, v)
v = rand.Intn(arg.Max-v+1) + v
args = append(args, v)
case "string":
l := rand.Intn(arg.Max-arg.Min+1) + arg.Min
sb := strings.Builder{}
for i := 0; i < l; i++ {
sb.WriteByte(chars[rand.Intn(len(chars))])
}
args = append(args, sb.String())
case "list":
v := arg.List[rand.Intn(len(arg.List))]
args = append(args, v)
}
return args
}
func (tc *testCase) buildSql() string {
args := make([]interface{}, 0, tc.numArgs)
for i := 0; i < len(tc.Args); i++ {
args = tc.Args[i].generate(args)
}
return fmt.Sprintf(tc.SQL, args...)
}
type statitics struct {
succeeded int64
failed int64
succeededDuration int64
failedDuration int64
}
var (
host string
port uint
database string
user string
password string
fetch bool
chLog chan string
wgLog sync.WaitGroup
startAt time.Time
shouldStop int64
wgTest sync.WaitGroup
stat statitics
totalWeight int
cases []testCase
)
func loadTestCaseFromFile(file *os.File) error {
if e := json.NewDecoder(file).Decode(&cases); e != nil {
return e
}
if len(cases) == 0 {
return fmt.Errorf("no test case loaded.")
}
for i := 0; i < len(cases); i++ {
c := &cases[i]
c.SQL = strings.TrimSpace(c.SQL)
c.isQuery = strings.ToLower(c.SQL[:6]) == "select"
if c.Weight < 0 {
return fmt.Errorf("test %d: negative weight", i)
}
totalWeight += c.Weight
for j := 0; j < len(c.Args); j++ {
arg := &c.Args[j]
arg.Type = strings.ToLower(arg.Type)
n, e := arg.check()
if e != nil {
return fmt.Errorf("test case %d argument %d: %s", i, j, e.Error())
}
c.numArgs += n
}
}
if totalWeight == 0 {
for i := 0; i < len(cases); i++ {
cases[i].Weight = 1
}
totalWeight = len(cases)
}
return nil
}
func loadTestCase(pathOrSQL string) error {
if f, e := os.Open(pathOrSQL); e == nil {
defer f.Close()
return loadTestCaseFromFile(f)
}
pathOrSQL = strings.TrimSpace(pathOrSQL)
if strings.ToLower(pathOrSQL[:6]) != "select" {
return fmt.Errorf("'%s' is not a valid file or SQL statement", pathOrSQL)
}
cases = append(cases, testCase{
isQuery: true,
Weight: 1,
numArgs: 0,
SQL: pathOrSQL,
})
totalWeight = 1
return nil
}
func selectTestCase() *testCase {
sum, target := 0, rand.Intn(totalWeight)
var c *testCase
for i := 0; i < len(cases); i++ {
c = &cases[i]
sum += c.Weight
if sum > target {
break
}
}
return c
}
func runTest() {
defer wgTest.Done()
db, e := sql.Open("taosSql", fmt.Sprintf("%s:%s@tcp(%s:%v)/%s", user, password, host, port, database))
if e != nil {
fmt.Printf("failed to connect to database: %s\n", e.Error())
return
}
defer db.Close()
for atomic.LoadInt64(&shouldStop) == 0 {
c := selectTestCase()
str := c.buildSql()
start := time.Now()
if c.isQuery {
var rows *sql.Rows
if rows, e = db.Query(str); rows != nil {
if fetch {
for rows.Next() {
}
}
rows.Close()
}
} else {
_, e = db.Exec(str)
}
duration := time.Now().Sub(start).Microseconds()
if e != nil {
if chLog != nil {
chLog <- str + ": " + e.Error()
}
atomic.AddInt64(&stat.failed, 1)
atomic.AddInt64(&stat.failedDuration, duration)
} else {
atomic.AddInt64(&stat.succeeded, 1)
atomic.AddInt64(&stat.succeededDuration, duration)
}
}
}
func getStatPrinter() func(tm time.Time) {
var last statitics
lastPrintAt := startAt
return func(tm time.Time) {
var current statitics
current.succeeded = atomic.LoadInt64(&stat.succeeded)
current.failed = atomic.LoadInt64(&stat.failed)
current.succeededDuration = atomic.LoadInt64(&stat.succeededDuration)
current.failedDuration = atomic.LoadInt64(&stat.failedDuration)
seconds := int64(tm.Sub(startAt).Seconds())
format := "\033[47;30m %02v:%02v:%02v | TOTAL REQ | TOTAL TIME(us) | TOTAL AVG(us) | REQUEST | TIME(us) | AVERAGE(us) |\033[0m\n"
fmt.Printf(format, seconds/3600, seconds%3600/60, seconds%60)
tr := current.succeeded + current.failed
td := current.succeededDuration + current.failedDuration
r := tr - last.succeeded - last.failed
d := td - last.succeededDuration - last.failedDuration
ta, a := 0.0, 0.0
if tr > 0 {
ta = float64(td) / float64(tr)
}
if r > 0 {
a = float64(d) / float64(r)
}
format = " TOTAL | %9v | %14v | %13.2f | %7v | %10v | % 13.2f |\n"
fmt.Printf(format, tr, td, ta, r, d, a)
tr = current.succeeded
td = current.succeededDuration
r = tr - last.succeeded
d = td - last.succeededDuration
ta, a = 0.0, 0.0
if tr > 0 {
ta = float64(td) / float64(tr)
}
if r > 0 {
a = float64(d) / float64(r)
}
format = " SUCCESS | \033[32m%9v\033[0m | \033[32m%14v\033[0m | \033[32m%13.2f\033[0m | \033[32m%7v\033[0m | \033[32m%10v\033[0m | \033[32m%13.2f\033[0m |\n"
fmt.Printf(format, tr, td, ta, r, d, a)
tr = current.failed
td = current.failedDuration
r = tr - last.failed
d = td - last.failedDuration
ta, a = 0.0, 0.0
if tr > 0 {
ta = float64(td) / float64(tr)
}
if r > 0 {
a = float64(d) / float64(r)
}
format = " FAIL | \033[31m%9v\033[0m | \033[31m%14v\033[0m | \033[31m%13.2f\033[0m | \033[31m%7v\033[0m | \033[31m%10v\033[0m | \033[31m%13.2f\033[0m |\n"
fmt.Printf(format, tr, td, ta, r, d, a)
last = current
lastPrintAt = tm
}
}
func startLogger(path string) error {
if len(path) == 0 {
return nil
}
f, e := os.Create(path)
if e != nil {
return e
}
chLog = make(chan string, 100)
wgLog.Add(1)
go func() {
for s := range chLog {
if f != nil {
f.WriteString(s)
f.WriteString("\n")
}
}
f.Close()
wgLog.Done()
}()
return nil
}
func main() {
var concurrency uint
var logPath string
flag.StringVar(&host, "h", "localhost", "host name or IP address of TDengine server")
flag.UintVar(&port, "P", 0, "port (default 0)")
flag.StringVar(&database, "d", "test", "database name")
flag.StringVar(&user, "u", "root", "user name")
flag.StringVar(&password, "p", "taosdata", "password")
flag.BoolVar(&fetch, "f", true, "fetch result or not")
flag.UintVar(&concurrency, "c", 4, "concurrency, number of goroutines for query")
flag.StringVar(&logPath, "l", "", "path of log file (default: no log)")
flag.Parse()
if e := startLogger(logPath); e != nil {
fmt.Println("failed to open log file:", e.Error())
return
}
pathOrSQL := flag.Arg(0)
if len(pathOrSQL) == 0 {
pathOrSQL = "cases.json"
}
if e := loadTestCase(pathOrSQL); e != nil {
fmt.Println("failed to load test cases:", e.Error())
return
}
rand.Seed(time.Now().UnixNano())
fmt.Printf("\nSERVER: %s DATABASE: %s CONCURRENCY: %d FETCH DATA: %v\n\n", host, database, concurrency, fetch)
startAt = time.Now()
printStat := getStatPrinter()
printStat(startAt)
for i := uint(0); i < concurrency; i++ {
wgTest.Add(1)
go runTest()
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
ticker := time.NewTicker(time.Second)
fmt.Println("Ctrl + C to exit....\033[1A")
LOOP:
for {
select {
case <-interrupt:
break LOOP
case tm := <-ticker.C:
fmt.Print("\033[4A")
printStat(tm)
}
}
atomic.StoreInt64(&shouldStop, 1)
fmt.Print("\033[100D'Ctrl + C' received, Waiting started query to stop...")
wgTest.Wait()
if chLog != nil {
close(chLog)
wgLog.Wait()
}
fmt.Print("\033[4A\033[100D")
printStat(time.Now())
fmt.Println()
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册