提交 150ba212 编写于 作者: S Shengliang Guan

merge from develop

...@@ -176,8 +176,6 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex); ...@@ -176,8 +176,6 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex); SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
void tscColumnListDestroy(SArray* pColList); void tscColumnListDestroy(SArray* pColList);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
int32_t tscValidateName(SSQLToken* pToken); int32_t tscValidateName(SSQLToken* pToken);
void tscIncStreamExecutionCount(void* pStream); void tscIncStreamExecutionCount(void* pStream);
......
...@@ -32,8 +32,8 @@ extern "C" { ...@@ -32,8 +32,8 @@ extern "C" {
#include "qExecutor.h" #include "qExecutor.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h" #include "qtsbuf.h"
#include "tcmdtype.h"
// forward declaration // forward declaration
struct SSqlInfo; struct SSqlInfo;
...@@ -395,7 +395,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, ...@@ -395,7 +395,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, void **taos); void *param, void **taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
int doAsyncParseSql(SSqlObj* pSql);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql); void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql);
...@@ -403,13 +402,14 @@ void tscKillSTableQuery(SSqlObj *pSql); ...@@ -403,13 +402,14 @@ void tscKillSTableQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(SSqlObj* pSql); bool tscIsUpdateQuery(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
// todo remove this function.
bool tscResultsetFetchCompleted(TAOS_RES *result); bool tscResultsetFetchCompleted(TAOS_RES *result);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd); char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column); void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
......
...@@ -213,27 +213,34 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -213,27 +213,34 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
// handle the sub queries of join query // handle the sub queries of join query
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql); tscFetchDatablockFromSubquery(pSql);
} else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) { } else if (pRes->completed) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. if(pCmd->command == TSDB_SQL_FETCH) {
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
return; tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
} else { return;
/* } else {
/*
* all available virtual node has been checked already, now we need to check * all available virtual node has been checked already, now we need to check
* for the next subclause queries * for the next subclause queries
*/ */
if (pCmd->clauseIndex < pCmd->numOfClause - 1) { if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return; return;
} }
/* /*
* 1. has reach the limitation * 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore * 2. no remain virtual nodes to be retrieved anymore
*/ */
(*pSql->fetchFp)(param, pSql, 0);
}
return;
} else if (pCmd->command == TSDB_SQL_RETRIEVE) {
// in case of show command, return no data
(*pSql->fetchFp)(param, pSql, 0); (*pSql->fetchFp)(param, pSql, 0);
} else {
assert(0);
} }
return;
} else { // current query is not completed, continue retrieve from node } else { // current query is not completed, continue retrieve from node
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
...@@ -405,17 +412,6 @@ void tscProcessAsyncFree(SSchedMsg *pMsg) { ...@@ -405,17 +412,6 @@ void tscProcessAsyncFree(SSchedMsg *pMsg) {
taos_free_result(pSql); taos_free_result(pSql);
} }
void tscQueueAsyncFreeResult(SSqlObj *pSql) {
tscDebug("%p sqlObj put in queue to async free", pSql);
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessAsyncFree;
schedMsg.ahandle = pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
int tscSendMsgToServer(SSqlObj *pSql); int tscSendMsgToServer(SSqlObj *pSql);
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
......
此差异已折叠。
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
*/ */
#include "os.h" #include "os.h"
#include "qsqltype.h"
#include "tcache.h" #include "tcache.h"
#include "tcmdtype.h"
#include "trpc.h" #include "trpc.h"
#include "tscLocalMerge.h" #include "tscLocalMerge.h"
#include "tscLog.h" #include "tscLog.h"
......
...@@ -148,7 +148,7 @@ void taos_init_imp() { ...@@ -148,7 +148,7 @@ void taos_init_imp() {
refreshTime = refreshTime < 10 ? 10 : refreshTime; refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscCacheHandle == NULL) { if (tscCacheHandle == NULL) {
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL); tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "client");
} }
tscDebug("client is initialized successfully"); tscDebug("client is initialized successfully");
......
...@@ -1115,31 +1115,6 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { ...@@ -1115,31 +1115,6 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
return taosArrayGetP(pColumnList, i); return taosArrayGetP(pColumnList, i);
} }
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
assert(src == NULL);
return NULL;
}
SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
}
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
return pFilter;
}
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) { static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
for(int32_t i = 0; i < numOfFilters; ++i) { for(int32_t i = 0; i < numOfFilters; ++i) {
if (pFilterInfo[i].filterstr) { if (pFilterInfo[i].filterstr) {
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_QSQLCMD_H #ifndef TDENGINE_TSQLMSGTYPE_H
#define TDENGINE_QSQLCMD_H #define TDENGINE_TSQLMSGTYPE_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -109,4 +109,4 @@ extern char *sqlCmd[]; ...@@ -109,4 +109,4 @@ extern char *sqlCmd[];
} }
#endif #endif
#endif // TDENGINE_QSQLCMD_H #endif // TDENGINE_TSQLMSGTYPE_H
...@@ -50,8 +50,8 @@ extern "C" { ...@@ -50,8 +50,8 @@ extern "C" {
typedef struct { typedef struct {
int8_t type; // Column type int8_t type; // Column type
int16_t colId; // column ID int16_t colId; // column ID
int32_t bytes; // column bytes int16_t bytes; // column bytes
int32_t offset; // point offset in SDataRow after the header part int16_t offset; // point offset in SDataRow after the header part
} STColumn; } STColumn;
#define colType(col) ((col)->type) #define colType(col) ((col)->type)
...@@ -116,7 +116,7 @@ typedef struct { ...@@ -116,7 +116,7 @@ typedef struct {
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder); void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes); int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Data row structure // ----------------- Data row structure
......
...@@ -27,4 +27,6 @@ SSchema tGetTableNameColumnSchema(); ...@@ -27,4 +27,6 @@ SSchema tGetTableNameColumnSchema();
bool tscValidateTableNameLength(size_t len); bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
#endif // TDENGINE_NAME_H #endif // TDENGINE_NAME_H
...@@ -15,4 +15,4 @@ ...@@ -15,4 +15,4 @@
#define TSDB_SQL_C #define TSDB_SQL_C
#include "qsqltype.h" #include "tcmdtype.h"
...@@ -43,7 +43,7 @@ int tdEncodeSchema(void **buf, STSchema *pSchema) { ...@@ -43,7 +43,7 @@ int tdEncodeSchema(void **buf, STSchema *pSchema) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
tlen += taosEncodeFixedI8(buf, colType(pCol)); tlen += taosEncodeFixedI8(buf, colType(pCol));
tlen += taosEncodeFixedI16(buf, colColId(pCol)); tlen += taosEncodeFixedI16(buf, colColId(pCol));
tlen += taosEncodeFixedI32(buf, colBytes(pCol)); tlen += taosEncodeFixedI16(buf, colBytes(pCol));
} }
return tlen; return tlen;
...@@ -65,10 +65,10 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) { ...@@ -65,10 +65,10 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
for (int i = 0; i < numOfCols; i++) { for (int i = 0; i < numOfCols; i++) {
int8_t type = 0; int8_t type = 0;
int16_t colId = 0; int16_t colId = 0;
int32_t bytes = 0; int16_t bytes = 0;
buf = taosDecodeFixedI8(buf, &type); buf = taosDecodeFixedI8(buf, &type);
buf = taosDecodeFixedI16(buf, &colId); buf = taosDecodeFixedI16(buf, &colId);
buf = taosDecodeFixedI32(buf, &bytes); buf = taosDecodeFixedI16(buf, &bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) { if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL; return NULL;
...@@ -105,7 +105,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { ...@@ -105,7 +105,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
pBuilder->version = version; pBuilder->version = version;
} }
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) { int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) {
if (!isValidDataType(type)) return -1; if (!isValidDataType(type)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) { if (pBuilder->nCols >= pBuilder->tCols) {
......
...@@ -49,4 +49,29 @@ SSchema tGetTableNameColumnSchema() { ...@@ -49,4 +49,29 @@ SSchema tGetTableNameColumnSchema() {
bool tscValidateTableNameLength(size_t len) { bool tscValidateTableNameLength(size_t len) {
return len < TSDB_TABLE_NAME_LEN; return len < TSDB_TABLE_NAME_LEN;
} }
\ No newline at end of file
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
assert(src == NULL);
return NULL;
}
SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
}
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
return pFilter;
}
...@@ -213,6 +213,8 @@ void cqDrop(void *handle) { ...@@ -213,6 +213,8 @@ void cqDrop(void *handle) {
pObj->pStream = NULL; pObj->pStream = NULL;
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->sqlStr);
free(pObj); free(pObj);
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
......
...@@ -44,7 +44,7 @@ void qDestroyQueryInfo(qinfo_t qinfo); ...@@ -44,7 +44,7 @@ void qDestroyQueryInfo(qinfo_t qinfo);
* @param qinfo * @param qinfo
* @return * @return
*/ */
void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param); void qTableQuery(qinfo_t qinfo);
/** /**
* Retrieve the produced results information, if current query is not paused or completed, * Retrieve the produced results information, if current query is not paused or completed,
......
...@@ -285,9 +285,9 @@ typedef struct { ...@@ -285,9 +285,9 @@ typedef struct {
int32_t tid; int32_t tid;
int16_t tversion; int16_t tversion;
int16_t colId; int16_t colId;
int16_t type;
int16_t bytes;
int32_t tagValLen; int32_t tagValLen;
int16_t numOfTags;
int32_t schemaLen;
char data[]; char data[];
} SUpdateTableTagValMsg; } SUpdateTableTagValMsg;
......
...@@ -41,7 +41,7 @@ int32_t mnodeInitProfile(); ...@@ -41,7 +41,7 @@ int32_t mnodeInitProfile();
void mnodeCleanupProfile(); void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port); SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port);
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port); SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn); void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg); int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
......
...@@ -43,7 +43,7 @@ ...@@ -43,7 +43,7 @@
extern void *tsMnodeTmr; extern void *tsMnodeTmr;
static SCacheObj *tsMnodeConnCache = NULL; static SCacheObj *tsMnodeConnCache = NULL;
static uint32_t tsConnIndex = 0; static int32_t tsConnIndex = 0;
static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
...@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() { ...@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, CONN_CHECK_TIME,false, mnodeFreeConn); tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn");
return 0; return 0;
} }
...@@ -89,7 +89,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { ...@@ -89,7 +89,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
return NULL; return NULL;
} }
uint32_t connId = atomic_add_fetch_32(&tsConnIndex, 1); int32_t connId = atomic_add_fetch_32(&tsConnIndex, 1);
if (connId == 0) atomic_add_fetch_32(&tsConnIndex, 1); if (connId == 0) atomic_add_fetch_32(&tsConnIndex, 1);
SConnObj connObj = { SConnObj connObj = {
...@@ -100,9 +100,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { ...@@ -100,9 +100,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
}; };
tstrncpy(connObj.user, user, sizeof(connObj.user)); tstrncpy(connObj.user, user, sizeof(connObj.user));
char key[10]; SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME);
int32_t len = sprintf(key, "%u", connId);
SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, len, &connObj, sizeof(connObj), CONN_KEEP_TIME);
mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return pConn; return pConn;
...@@ -113,12 +111,9 @@ void mnodeReleaseConn(SConnObj *pConn) { ...@@ -113,12 +111,9 @@ void mnodeReleaseConn(SConnObj *pConn) {
taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false); taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
} }
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) {
char key[10];
int32_t len = sprintf(key, "%u", connId);
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, &connId, sizeof(int32_t), expireTime);
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, len, expireTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return NULL; return NULL;
...@@ -547,7 +542,8 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { ...@@ -547,7 +542,8 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) {
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, connIdStr, strlen(connIdStr)); int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId); mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId);
return TSDB_CODE_MND_INVALID_CONN_ID; return TSDB_CODE_MND_INVALID_CONN_ID;
...@@ -576,8 +572,9 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { ...@@ -576,8 +572,9 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) {
} }
int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10); int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10);
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, connIdStr, strlen(connIdStr)); SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId); mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId);
return TSDB_CODE_MND_INVALID_CONN_ID; return TSDB_CODE_MND_INVALID_CONN_ID;
...@@ -594,7 +591,8 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { ...@@ -594,7 +591,8 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS; if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont; SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont;
SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, pKill->queryId, strlen(pKill->queryId)); int32_t connId = atoi(pKill->queryId);
SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId); mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
return TSDB_CODE_MND_INVALID_CONN_ID; return TSDB_CODE_MND_INVALID_CONN_ID;
......
...@@ -65,7 +65,7 @@ int32_t mnodeInitShow() { ...@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, mnodeFreeShowObj); tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show");
return 0; return 0;
} }
...@@ -364,10 +364,7 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) { ...@@ -364,10 +364,7 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
} }
static bool mnodeAccquireShowObj(SShowObj *pShow) { static bool mnodeAccquireShowObj(SShowObj *pShow) {
char key[10]; SShowObj *pSaved = taosCacheAcquireByKey(tsMnodeShowCache, &pShow->index, sizeof(int32_t));
int32_t len = sprintf(key, "%d", pShow->index);
SShowObj *pSaved = taosCacheAcquireByKey(tsMnodeShowCache, key, len);
if (pSaved == pShow) { if (pSaved == pShow) {
mDebug("%p, show is accquired from cache", pShow); mDebug("%p, show is accquired from cache", pShow);
return true; return true;
...@@ -378,14 +375,11 @@ static bool mnodeAccquireShowObj(SShowObj *pShow) { ...@@ -378,14 +375,11 @@ static bool mnodeAccquireShowObj(SShowObj *pShow) {
static void *mnodePutShowObj(SShowObj *pShow, int32_t size) { static void *mnodePutShowObj(SShowObj *pShow, int32_t size) {
if (tsMnodeShowCache != NULL) { if (tsMnodeShowCache != NULL) {
char key[10];
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1); pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
int32_t len = sprintf(key, "%d", pShow->index); SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, &pShow->index, sizeof(int32_t), pShow, size, 6);
mDebug("%p, show is put into cache, index:%d", newQhandle, pShow->index);
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, len, pShow, size, 6);
free(pShow); free(pShow);
mDebug("%p, show is put into cache, index:%s", newQhandle, key);
return newQhandle; return newQhandle;
} }
......
...@@ -53,12 +53,12 @@ static void httpDestroyContext(void *data) { ...@@ -53,12 +53,12 @@ static void httpDestroyContext(void *data) {
httpFreeJsonBuf(pContext); httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext); httpFreeMultiCmds(pContext);
httpDebug("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount); httpDebug("context:%p, is destroyed, refCount:%d data:%p", pContext, pContext->refCount, data);
tfree(pContext); tfree(pContext);
} }
bool httpInitContexts() { bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 2, false, httpDestroyContext); tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) { if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache"); httpError("failed to init context cache");
return false; return false;
...@@ -103,17 +103,14 @@ HttpContext *httpCreateContext(int32_t fd) { ...@@ -103,17 +103,14 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext)); HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL; if (pContext == NULL) return NULL;
char contextStr[16] = {0};
int32_t keySize = snprintf(contextStr, sizeof(contextStr), "%p", pContext);
pContext->fd = fd; pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10; pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec(); pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY; pContext->state = HTTP_CONTEXT_STATE_READY;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, keySize, &pContext, sizeof(HttpContext *), 3); HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(void *), &pContext, sizeof(void *), 3);
pContext->ppContext = ppContext; pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext); httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
// set the ref to 0 // set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
...@@ -122,16 +119,13 @@ HttpContext *httpCreateContext(int32_t fd) { ...@@ -122,16 +119,13 @@ HttpContext *httpCreateContext(int32_t fd) {
} }
HttpContext *httpGetContext(void *ptr) { HttpContext *httpGetContext(void *ptr) {
char contextStr[16] = {0}; HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &ptr, sizeof(HttpContext *));
int32_t len = snprintf(contextStr, sizeof(contextStr), "%p", ptr);
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, contextStr, len);
if (ppContext) { if (ppContext) {
HttpContext *pContext = *ppContext; HttpContext *pContext = *ppContext;
if (pContext) { if (pContext) {
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpDebug("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount); httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount);
return pContext; return pContext;
} }
} }
...@@ -141,9 +135,10 @@ HttpContext *httpGetContext(void *ptr) { ...@@ -141,9 +135,10 @@ HttpContext *httpGetContext(void *ptr) {
void httpReleaseContext(HttpContext *pContext) { void httpReleaseContext(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0); assert(refCount >= 0);
httpDebug("context:%p, is releasd, refCount:%d", pContext, refCount);
HttpContext **ppContext = pContext->ppContext; HttpContext **ppContext = pContext->ppContext;
httpDebug("context:%p, is releasd, data:%p refCount:%d", pContext, ppContext, refCount);
if (tsHttpServer.contextCache != NULL) { if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else { } else {
......
...@@ -115,7 +115,7 @@ void httpCleanUpSessions() { ...@@ -115,7 +115,7 @@ void httpCleanUpSessions() {
} }
bool httpInitSessions() { bool httpInitSessions() {
tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession); tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests");
if (tsHttpServer.sessionCache == NULL) { if (tsHttpServer.sessionCache == NULL) {
httpError("failed to init session cache"); httpError("failed to init session cache");
return false; return false;
......
...@@ -95,16 +95,13 @@ typedef struct SSingleColumnFilterInfo { ...@@ -95,16 +95,13 @@ typedef struct SSingleColumnFilterInfo {
} SSingleColumnFilterInfo; } SSingleColumnFilterInfo;
typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
int32_t tableIndex;
int32_t groupIndex; // group id in table list
TSKEY lastKey; TSKEY lastKey;
int32_t numOfRes; int32_t groupIndex; // group id in table list
int16_t queryRangeSet; // denote if the query range is set, only available for interval query int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag; int64_t tag;
STimeWindow win; STimeWindow win;
STSCursor cur; STSCursor cur;
void* pTable; // for retrieve the page id list void* pTable; // for retrieve the page id list
SWindowResInfo windowResInfo; SWindowResInfo windowResInfo;
} STableQueryInfo; } STableQueryInfo;
...@@ -127,11 +124,6 @@ typedef struct SQueryCostInfo { ...@@ -127,11 +124,6 @@ typedef struct SQueryCostInfo {
uint64_t computTime; uint64_t computTime;
} SQueryCostInfo; } SQueryCostInfo;
//typedef struct SGroupItem {
// void *pTable;
// STableQueryInfo *info;
//} SGroupItem;
typedef struct SQuery { typedef struct SQuery {
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
...@@ -173,12 +165,12 @@ typedef struct SQueryRuntimeEnv { ...@@ -173,12 +165,12 @@ typedef struct SQueryRuntimeEnv {
STSBuf* pTSBuf; STSBuf* pTSBuf;
STSCursor cur; STSCursor cur;
SQueryCostInfo summary; SQueryCostInfo summary;
bool stableQuery; // super table query or not
void* pQueryHandle; void* pQueryHandle;
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file bool stableQuery; // super table query or not
bool topBotQuery; // false bool topBotQuery; // false
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
typedef struct SQInfo { typedef struct SQInfo {
...@@ -205,7 +197,8 @@ typedef struct SQInfo { ...@@ -205,7 +197,8 @@ typedef struct SQInfo {
*/ */
int32_t tableIndex; int32_t tableIndex;
int32_t numOfGroupResultPages; int32_t numOfGroupResultPages;
_qinfo_free_fn_t fn; _qinfo_free_fn_t freeFn;
jmp_buf env;
} SQInfo; } SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "qfill.h" #include "qfill.h"
#include "hash.h" #include "hash.h"
...@@ -22,9 +23,8 @@ ...@@ -22,9 +23,8 @@
#include "qresultBuf.h" #include "qresultBuf.h"
#include "query.h" #include "query.h"
#include "queryLog.h" #include "queryLog.h"
#include "taosmsg.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tscUtil.h" // todo move the function to common module #include "exception.h"
#include "tscompression.h" #include "tscompression.h"
#include "ttime.h" #include "ttime.h"
...@@ -87,6 +87,17 @@ typedef struct { ...@@ -87,6 +87,17 @@ typedef struct {
STSCursor cur; STSCursor cur;
} SQueryStatusInfo; } SQueryStatusInfo;
static UNUSED_FUNC void *u_malloc (size_t __size) {
// uint32_t v = rand();
// if (v % 5 <= 1) {
// return NULL;
// } else {
return malloc(__size);
// }
}
#define malloc u_malloc
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
...@@ -2586,7 +2597,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2586,7 +2597,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
} }
int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
// int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId; int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
...@@ -2604,15 +2614,6 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { ...@@ -2604,15 +2614,6 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
if (pResultInfo->numOfRes > 0) { if (pResultInfo->numOfRes > 0) {
return pResultInfo->numOfRes; return pResultInfo->numOfRes;
} }
// if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) {
// maxOutput = pResultInfo->numOfRes;
//
// if (maxOutput > 0) {
// break;
// }
// }
//
// assert(pResultInfo != NULL);
} }
return 0; return 0;
...@@ -2623,12 +2624,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2623,12 +2624,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
size_t size = taosArrayGetSize(pGroup); size_t size = taosArrayGetSize(pGroup);
tFilePage **buffer = pQuery->sdata; tFilePage **buffer = pQuery->sdata;
int32_t * posList = calloc(size, sizeof(int32_t));
int32_t* posList = calloc(size, sizeof(int32_t));
STableQueryInfo **pTableList = malloc(POINTER_BYTES * size); STableQueryInfo **pTableList = malloc(POINTER_BYTES * size);
if (pTableList == NULL || posList == NULL) {
tfree(posList);
tfree(pTableList);
qError("QInfo:%p failed alloc memory", pQInfo);
longjmp(pQInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// todo opt for the case of one table per group // todo opt for the case of one table per group
int32_t numOfTables = 0; int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
...@@ -4069,7 +4077,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { ...@@ -4069,7 +4077,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
return pFillCol; return pFillCol;
} }
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery, void* freeParam, _qinfo_free_fn_t fn) { int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
...@@ -4083,8 +4091,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4083,8 +4091,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pQInfo->tsdb = tsdb; pQInfo->tsdb = tsdb;
pQInfo->vgId = vgId; pQInfo->vgId = vgId;
pQInfo->param = freeParam;
pQInfo->fn = fn;
pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = pTsBuf; pRuntimeEnv->pTSBuf = pTsBuf;
...@@ -4333,7 +4339,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4333,7 +4339,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
taosArrayDestroy(s); taosArrayDestroy(s);
// here we simply set the first table as current table // here we simply set the first table as current table
pQuery->current = (STableQueryInfo*) GET_TABLEGROUP(pQInfo, 0); SArray* first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
pQuery->current = taosArrayGetP(first, 0);
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
int64_t numOfRes = getNumOfResult(pRuntimeEnv); int64_t numOfRes = getNumOfResult(pRuntimeEnv);
...@@ -4932,14 +4940,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -4932,14 +4940,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time // record the total elapsed time
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st); pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
/* check if query is killed or not */
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
} }
static void stableQueryImpl(SQInfo *pQInfo) { static void stableQueryImpl(SQInfo *pQInfo) {
...@@ -4961,10 +4961,6 @@ static void stableQueryImpl(SQInfo *pQInfo) { ...@@ -4961,10 +4961,6 @@ static void stableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time // record the total elapsed time
pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st); pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st);
if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
}
} }
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
...@@ -5076,6 +5072,8 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p ...@@ -5076,6 +5072,8 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
*/ */
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr,
char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) { char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) {
int32_t code = TSDB_CODE_SUCCESS;
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
...@@ -5102,7 +5100,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5102,7 +5100,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
// query msg safety check // query msg safety check
if (!validateQueryMsg(pQueryMsg)) { if (!validateQueryMsg(pQueryMsg)) {
return TSDB_CODE_QRY_INVALID_MSG; code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
} }
char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols; char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
...@@ -5174,7 +5173,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5174,7 +5173,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
int16_t functionId = pExprMsg->functionId; int16_t functionId = pExprMsg->functionId;
if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) { if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) {
if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression. if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression.
return TSDB_CODE_QRY_INVALID_MSG; code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
} }
} else { } else {
// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) { // if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
...@@ -5186,6 +5186,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5186,6 +5186,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
} }
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) { if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup; goto _cleanup;
} }
...@@ -5193,6 +5194,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5193,6 +5194,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns
*groupbyCols = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex)); *groupbyCols = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex));
if (*groupbyCols == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
(*groupbyCols)[i].colId = *(int16_t *)pMsg; (*groupbyCols)[i].colId = *(int16_t *)pMsg;
...@@ -5248,7 +5253,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5248,7 +5253,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (*pMsg != 0) { if (*pMsg != 0) {
size_t len = strlen(pMsg) + 1; size_t len = strlen(pMsg) + 1;
*tbnameCond = malloc(len); *tbnameCond = malloc(len);
if (*tbnameCond == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
strcpy(*tbnameCond, pMsg); strcpy(*tbnameCond, pMsg);
pMsg += len; pMsg += len;
} }
...@@ -5258,7 +5269,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5258,7 +5269,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset); pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset);
return 0;
return TSDB_CODE_SUCCESS;
_cleanup: _cleanup:
tfree(*pExpr); tfree(*pExpr);
...@@ -5268,7 +5280,8 @@ _cleanup: ...@@ -5268,7 +5280,8 @@ _cleanup:
tfree(*groupbyCols); tfree(*groupbyCols);
tfree(*tagCols); tfree(*tagCols);
tfree(*tagCond); tfree(*tagCond);
return TSDB_CODE_QRY_INVALID_MSG;
return code;
} }
static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
...@@ -5656,7 +5669,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5656,7 +5669,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window); STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window);
item->groupIndex = i; item->groupIndex = i;
item->tableIndex = tableIndex++;
taosArrayPush(p1, &item); taosArrayPush(p1, &item);
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES); taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES);
} }
...@@ -5670,7 +5682,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5670,7 +5682,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery->window = pQueryMsg->window; pQuery->window = pQueryMsg->window;
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); int32_t code = TAOS_SYSTEM_ERROR(errno);
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, tstrerror(code));
goto _cleanup; goto _cleanup;
} }
...@@ -5681,7 +5694,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5681,7 +5694,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
_cleanup: _cleanup:
freeQInfo(pQInfo); freeQInfo(pQInfo);
return NULL; return NULL;
} }
...@@ -5723,6 +5735,9 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ ...@@ -5723,6 +5735,9 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pQInfo->param = param;
pQInfo->freeFn = fn;
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
...@@ -5732,7 +5747,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ ...@@ -5732,7 +5747,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
} }
// filter the qualified // filter the qualified
if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable, param, fn)) != TSDB_CODE_SUCCESS) { if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -5786,7 +5801,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5786,7 +5801,7 @@ static void freeQInfo(SQInfo *pQInfo) {
// todo refactor, extract method to destroytableDataInfo // todo refactor, extract method to destroytableDataInfo
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroups; ++i) { for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = GET_TABLEGROUP(pQInfo, i);; SArray *p = GET_TABLEGROUP(pQInfo, i);
size_t num = taosArrayGetSize(p); size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) { for(int32_t j = 0; j < num; ++j) {
...@@ -6032,19 +6047,19 @@ void qDestroyQueryInfo(qinfo_t qHandle) { ...@@ -6032,19 +6047,19 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref); qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref);
if (ref == 0) { if (ref == 0) {
_qinfo_free_fn_t fn = pQInfo->fn; _qinfo_free_fn_t freeFp = pQInfo->freeFn;
void* param = pQInfo->param; void* param = pQInfo->param;
doDestoryQueryInfo(pQInfo); doDestoryQueryInfo(pQInfo);
if (fn != NULL) { if (freeFp != NULL) {
assert(param != NULL); assert(param != NULL);
fn(param); freeFp(param);
} }
} }
} }
void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { void qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo) {
...@@ -6054,17 +6069,34 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { ...@@ -6054,17 +6069,34 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo);
return; return;
} }
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo); qDebug("QInfo:%p no table exists for query, abort", pQInfo);
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return;
}
int32_t ret = setjmp(pQInfo->env);
// error occurs, record the error code and return to client
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
qDebug("QInfo:%p query abort due to error occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return; return;
} }
qDebug("QInfo:%p query task is launched", pQInfo); qDebug("QInfo:%p query task is launched", pQInfo);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
assert(pQInfo->runtimeEnv.pQueryHandle == NULL); assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
buildTagQueryResult(pQInfo); // todo support the limit/offset buildTagQueryResult(pQInfo); // todo support the limit/offset
...@@ -6074,6 +6106,16 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { ...@@ -6074,6 +6106,16 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
tableQueryImpl(pQInfo); tableQueryImpl(pQInfo);
} }
SQuery* pQuery = pRuntimeEnv->pQuery;
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo);
} }
......
...@@ -1173,9 +1173,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { ...@@ -1173,9 +1173,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
size_t len = strlen(cond) + VARSTR_HEADER_SIZE; size_t len = strlen(cond) + VARSTR_HEADER_SIZE;
char* p = exception_malloc(len); char* p = exception_malloc(len);
varDataSetLen(p, len - VARSTR_HEADER_SIZE); STR_WITH_SIZE_TO_VARSTR(p, cond, len - VARSTR_HEADER_SIZE);
memcpy(varDataVal(p), cond, len);
taosArrayPush(pVal->arr, &p); taosArrayPush(pVal->arr, &p);
} }
......
...@@ -15,16 +15,16 @@ ...@@ -15,16 +15,16 @@
#include "os.h" #include "os.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "queryLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcmdtype.h"
#include "tglobal.h" #include "tglobal.h"
#include "tstoken.h" #include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h" #include "ttime.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tutil.h" #include "tutil.h"
#include "qsqltype.h"
#include "tstrbuild.h"
#include "queryLog.h"
SSqlInfo qSQLParse(const char *pStr) { SSqlInfo qSQLParse(const char *pStr) {
void *pParser = ParseAlloc(malloc); void *pParser = ParseAlloc(malloc);
......
...@@ -25,17 +25,17 @@ ...@@ -25,17 +25,17 @@
#include <stdio.h> #include <stdio.h>
/************ Begin %include sections from the grammar ************************/ /************ Begin %include sections from the grammar ************************/
#include <assert.h>
#include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <assert.h>
#include <stdbool.h>
#include "tutil.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "tcmdtype.h"
#include "tstoken.h" #include "tstoken.h"
#include "tvariant.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "qsqltype.h" #include "tutil.h"
#include "tvariant.h"
/**************** End of %include directives **********************************/ /**************** End of %include directives **********************************/
/* These constants specify the various numeric values for terminal symbols /* These constants specify the various numeric values for terminal symbols
** in a format understandable to "makeheaders". This section is blank unless ** in a format understandable to "makeheaders". This section is blank unless
......
...@@ -255,17 +255,46 @@ _err: ...@@ -255,17 +255,46 @@ _err:
return NULL; return NULL;
} }
static int32_t colIdCompar(const void* left, const void* right) {
int16_t colId = *(int16_t*) left;
STColumn* p2 = (STColumn*) right;
if (colId == p2->colId) {
return 0;
}
return (colId < p2->colId)? -1:1;
}
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
int16_t tversion = htons(pMsg->tversion);
STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid)); pMsg->uid = htobe64(pMsg->uid);
pMsg->tid = htonl(pMsg->tid);
pMsg->tversion = htons(pMsg->tversion);
pMsg->colId = htons(pMsg->colId);
pMsg->tagValLen = htonl(pMsg->tagValLen);
pMsg->numOfTags = htons(pMsg->numOfTags);
pMsg->schemaLen = htonl(pMsg->schemaLen);
assert(pMsg->schemaLen == sizeof(STColumn) * pMsg->numOfTags);
char* d = pMsg->data;
for(int32_t i = 0; i < pMsg->numOfTags; ++i) {
STColumn* pCol = (STColumn*) d;
pCol->colId = htons(pCol->colId);
pCol->bytes = htons(pCol->bytes);
pCol->offset = 0;
d += sizeof(STColumn);
}
STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid);
if (pTable == NULL) { if (pTable == NULL) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1; return -1;
} }
if (TABLE_TID(pTable) != htonl(pMsg->tid)) { if (TABLE_TID(pTable) != pMsg->tid) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1; return -1;
} }
...@@ -277,10 +306,10 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { ...@@ -277,10 +306,10 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
return -1; return -1;
} }
if (schemaVersion(tsdbGetTableTagSchema(pTable)) < tversion) { if (schemaVersion(tsdbGetTableTagSchema(pTable)) < pMsg->tversion) {
tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo), tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo),
schemaVersion(tsdbGetTableTagSchema(pTable)), tversion); schemaVersion(tsdbGetTableTagSchema(pTable)), pMsg->tversion);
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid)); void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pMsg->tid);
if (msg == NULL) return -1; if (msg == NULL) return -1;
// Deal with error her // Deal with error her
...@@ -299,19 +328,24 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { ...@@ -299,19 +328,24 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STSchema *pTagSchema = tsdbGetTableTagSchema(pTable); STSchema *pTagSchema = tsdbGetTableTagSchema(pTable);
if (schemaVersion(pTagSchema) > tversion) { if (schemaVersion(pTagSchema) > pMsg->tversion) {
tsdbError( tsdbError(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag " "vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
"version %d", "version %d",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tversion, schemaVersion(pTable->tagSchema)); REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema));
return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
} }
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
tsdbRemoveTableFromIndex(pMeta, pTable); tsdbRemoveTableFromIndex(pMeta, pTable);
} }
// TODO: remove table from index if it is the first column of tag // TODO: remove table from index if it is the first column of tag
tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data);
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { // TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId
STColumn* res = bsearch(&pMsg->colId, pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
assert(res != NULL);
tdSetKVRowDataOfCol(&pTable->tagVal, pMsg->colId, res->type, pMsg->data + pMsg->schemaLen);
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -74,9 +74,6 @@ typedef struct STableCheckInfo { ...@@ -74,9 +74,6 @@ typedef struct STableCheckInfo {
SDataCols* pDataCols; SDataCols* pDataCols;
int32_t chosen; // indicate which iterator should move forward int32_t chosen; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not bool initBuf; // whether to initialize the in-memory skip list iterator or not
SMemTable* mem; // in-mem buffer, hold the ref count
SMemTable* imem; // imem buffer, hold the ref count to avoid release
SSkipListIterator* iter; // mem buffer skip list iterator SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator SSkipListIterator* iiter; // imem buffer skip list iterator
} STableCheckInfo; } STableCheckInfo;
...@@ -113,6 +110,8 @@ typedef struct STsdbQueryHandle { ...@@ -113,6 +110,8 @@ typedef struct STsdbQueryHandle {
SFileGroupIter fileIter; SFileGroupIter fileIter;
SRWHelper rhelper; SRWHelper rhelper;
STableBlockInfo* pDataBlockInfo; STableBlockInfo* pDataBlockInfo;
SMemTable* mem; // mem-table
SMemTable* imem; // imem-table, acquired from snapshot
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
...@@ -138,9 +137,6 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { ...@@ -138,9 +137,6 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
} }
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
...@@ -154,6 +150,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -154,6 +150,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
...@@ -252,22 +249,22 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -252,22 +249,22 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
pCheckInfo->initBuf = true; pCheckInfo->initBuf = true;
int32_t order = pHandle->order; int32_t order = pHandle->order;
tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem); // tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);
// no data in buffer, abort // no data in buffer, abort
if (pCheckInfo->mem == NULL && pCheckInfo->imem == NULL) { if (pHandle->mem == NULL && pHandle->imem == NULL) {
return false; return false;
} }
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
if (pCheckInfo->mem && pCheckInfo->mem->tData[pCheckInfo->tableId.tid] != NULL) { if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pCheckInfo->mem->tData[pCheckInfo->tableId.tid]->pData, pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
} }
if (pCheckInfo->imem && pCheckInfo->imem->tData[pCheckInfo->tableId.tid] != NULL) { if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pCheckInfo->imem->tData[pCheckInfo->tableId.tid]->pData, pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
} }
...@@ -1518,7 +1515,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1518,7 +1515,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb); tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb);
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
...@@ -2319,9 +2317,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2319,9 +2317,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter); tSkipListDestroyIter(pTableCheckInfo->iter);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem);
if (pTableCheckInfo->pDataCols != NULL) { if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf); tfree(pTableCheckInfo->pDataCols->buf);
} }
...@@ -2341,9 +2336,12 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2341,9 +2336,12 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
taosArrayDestroy(pQueryHandle->pColumns); taosArrayDestroy(pQueryHandle->pColumns);
tfree(pQueryHandle->pDataBlockInfo); tfree(pQueryHandle->pDataBlockInfo);
tfree(pQueryHandle->statis); tfree(pQueryHandle->statis);
// todo check error
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);
tsdbDestroyHelper(&pQueryHandle->rhelper); tsdbDestroyHelper(&pQueryHandle->rhelper);
tfree(pQueryHandle); tfree(pQueryHandle);
} }
......
...@@ -65,6 +65,7 @@ typedef struct { ...@@ -65,6 +65,7 @@ typedef struct {
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime; int64_t refreshTime;
STrashElem * pTrash; STrashElem * pTrash;
const char * cacheName;
// void * tmrCtrl; // void * tmrCtrl;
// void * pTimer; // void * pTimer;
SCacheStatis statistics; SCacheStatis statistics;
...@@ -90,7 +91,7 @@ typedef struct { ...@@ -90,7 +91,7 @@ typedef struct {
* @param fn free resource callback function * @param fn free resource callback function
* @return * @return
*/ */
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn); SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char *cacheName);
/** /**
* initialize the cache object and set the free object callback function * initialize the cache object and set the free object callback function
...@@ -98,7 +99,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -98,7 +99,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
* @param freeCb * @param freeCb
* @return * @return
*/ */
SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn); SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char *cacheName);
/** /**
* add data into cache * add data into cache
...@@ -128,7 +129,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -128,7 +129,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
* @param expireTime new expire time of data * @param expireTime new expire time of data
* @return * @return
*/ */
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, size_t keyLen, uint64_t expireTime); void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime);
/** /**
* Add one reference count for the exist data, and assign this data for a new owner. * Add one reference count for the exist data, and assign this data for a new owner.
......
...@@ -118,8 +118,10 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo ...@@ -118,8 +118,10 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = pNode->size; int32_t size = pNode->size;
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
uDebug("key:%p, is removed from cache,total:%" PRId64 ",size:%dbytes", pNode->key, pCacheObj->totalSize, size); uDebug("key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes, cacheName:%s",
pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size,
pCacheObj->cacheName);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode); free(pNode);
} }
...@@ -224,7 +226,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); ...@@ -224,7 +226,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
*/ */
static void* taosCacheRefresh(void *handle); static void* taosCacheRefresh(void *handle);
SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn) { SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
if (refreshTimeInSeconds <= 0) { if (refreshTimeInSeconds <= 0) {
return NULL; return NULL;
} }
...@@ -236,6 +238,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo ...@@ -236,6 +238,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
} }
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false); pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
pCacheObj->cacheName = cacheName;
if (pCacheObj->pHashTable == NULL) { if (pCacheObj->pHashTable == NULL) {
free(pCacheObj); free(pCacheObj);
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
...@@ -265,8 +268,8 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo ...@@ -265,8 +268,8 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
return pCacheObj; return pCacheObj;
} }
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn) { SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn); return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn, cacheName);
} }
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) { void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
...@@ -284,19 +287,21 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -284,19 +287,21 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L); pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L);
if (NULL != pNode) { if (NULL != pNode) {
pCacheObj->totalSize += pNode->size; pCacheObj->totalSize += pNode->size;
uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes", uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
key, pNode, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), pCacheObj->totalSize, dataSize); "bytes size:%" PRId64 "bytes, cacheName:%s",
key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime),
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize, pCacheObj->cacheName);
} else { } else {
uError("key:%p, failed to added into cache, out of memory", key); uError("key:%p, failed to added into cache, out of memory, cacheName:%s", key, pCacheObj->cacheName);
} }
} else { // old data exists, update the node } else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
uDebug("key:%p, %p exist in cache, updated", key, pNode); uDebug("key:%p, %p exist in cache, updated, cacheName:%s", key, pNode->data, pCacheObj->cacheName);
} }
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
return (pNode != NULL) ? pNode->data : NULL; return (pNode != NULL) ? pNode->data : NULL;
} }
...@@ -327,17 +332,17 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -327,17 +332,17 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
if (ptNode != NULL) { if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("key:%p, is retrieved from cache, %p refcnt:%d", key, (*ptNode), ref); uDebug("key:%p, %p is retrieved from cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data, ref, pCacheObj->cacheName);
} else { } else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("key:%p, not in cache, retrieved failed", key); uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName);
} }
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL; return (ptNode != NULL) ? (*ptNode)->data : NULL;
} }
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, size_t keyLen, uint64_t expireTime) { void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
return NULL; return NULL;
} }
...@@ -350,17 +355,18 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, siz ...@@ -350,17 +355,18 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, siz
(*ptNode)->extendFactor += 1; (*ptNode)->extendFactor += 1;
// (*ptNode)->lifespan = expireTime; // (*ptNode)->lifespan = expireTime;
} }
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
if (ptNode != NULL) { if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("key:%p, expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); uDebug("key:%p, %p expireTime is updated in cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data,
T_REF_VAL_GET(*ptNode), pCacheObj->cacheName);
} else { } else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("key:%p, not in cache, retrieved failed", key); uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName);
} }
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL; return (ptNode != NULL) ? (*ptNode)->data : NULL;
} }
...@@ -375,9 +381,9 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { ...@@ -375,9 +381,9 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
uError("key: %p the data from cache is invalid", ptNode); uError("key: %p the data from cache is invalid", ptNode);
return NULL; return NULL;
} }
int32_t ref = T_REF_INC(ptNode); int32_t ref = T_REF_INC(ptNode);
uDebug("%p acquired by data in cache, refcnt:%d", ptNode, ref) uDebug("%p acquired by data in cache, refcnt:%d, cacheName:%s", ptNode->data, ref, pCacheObj->cacheName);
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if (pCacheObj->extendLifespan) { if (pCacheObj->extendLifespan) {
...@@ -385,7 +391,8 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { ...@@ -385,7 +391,8 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) { if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) {
ptNode->extendFactor += 1; ptNode->extendFactor += 1;
uDebug("key:%p extend life time to %"PRId64, ptNode, ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime); uDebug("%p extend life time to %" PRId64, ptNode->data,
ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime);
} }
} }
...@@ -424,14 +431,14 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -424,14 +431,14 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
if (pNode->signature != (uint64_t)pNode) { if (pNode->signature != (uint64_t)pNode) {
uError("key:%p, release invalid cache data", pNode); uError("%p, release invalid cache data", pNode);
return; return;
} }
*data = NULL; *data = NULL;
int16_t ref = T_REF_DEC(pNode); int16_t ref = T_REF_DEC(pNode);
uDebug("%p data released, refcnt:%d", pNode, ref); uDebug("key:%p, %p is released, refcnt:%d, cacheName:%s", pNode->key, pNode->data, ref, pCacheObj->cacheName);
if (_remove && (!pNode->inTrashCan)) { if (_remove && (!pNode->inTrashCan)) {
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
...@@ -474,6 +481,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { ...@@ -474,6 +481,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
pCacheObj->deleting = 1; pCacheObj->deleting = 1;
pthread_join(pCacheObj->refreshWorker, NULL); pthread_join(pCacheObj->refreshWorker, NULL);
uInfo("cacheName:%p, will be cleanuped", pCacheObj->cacheName);
doCleanupDataCache(pCacheObj); doCleanupDataCache(pCacheObj);
} }
...@@ -522,7 +530,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { ...@@ -522,7 +530,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pNode->inTrashCan = true; pNode->inTrashCan = true;
pCacheObj->numOfElemsInTrash++; pCacheObj->numOfElemsInTrash++;
uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
} }
void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
...@@ -547,7 +555,6 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { ...@@ -547,7 +555,6 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
pCacheObj->freeFp(pElem->pData->data); pCacheObj->freeFp(pElem->pData->data);
} }
uError("-------------------free obj:%p", pElem->pData);
free(pElem->pData); free(pElem->pData);
free(pElem); free(pElem);
} }
...@@ -574,7 +581,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { ...@@ -574,7 +581,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
} }
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
pCacheObj->numOfElemsInTrash - 1); pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem; STrashElem *p = pElem;
...@@ -598,7 +605,8 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { ...@@ -598,7 +605,8 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
if (T_REF_VAL_GET(pNode) <= 0) { if (T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode); taosCacheReleaseNode(pCacheObj, pNode);
} else { } else {
uDebug("key:%p, will not remove from cache, refcnt:%d", pNode->key, T_REF_VAL_GET(pNode)); uDebug("key:%p, %p will not remove from cache, refcnt:%d, cacheName:%s", pNode->key, pNode->data,
T_REF_VAL_GET(pNode), pCacheObj->cacheName);
} }
} }
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
......
...@@ -19,7 +19,7 @@ int32_t tsMaxMeterConnections = 200; ...@@ -19,7 +19,7 @@ int32_t tsMaxMeterConnections = 200;
// test cache // test cache
TEST(testCase, client_cache_test) { TEST(testCase, client_cache_test) {
const int32_t REFRESH_TIME_IN_SEC = 2; const int32_t REFRESH_TIME_IN_SEC = 2;
SCacheObj* tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL); SCacheObj* tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL, "test");
const char* key1 = "test1"; const char* key1 = "test1";
char data1[] = "test11"; char data1[] = "test11";
...@@ -105,7 +105,7 @@ TEST(testCase, client_cache_test) { ...@@ -105,7 +105,7 @@ TEST(testCase, client_cache_test) {
TEST(testCase, cache_resize_test) { TEST(testCase, cache_resize_test) {
const int32_t REFRESH_TIME_IN_SEC = 2; const int32_t REFRESH_TIME_IN_SEC = 2;
auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL); auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL, "test");
char key[256] = {0}; char key[256] = {0};
char data[1024] = "abcdefghijk"; char data[1024] = "abcdefghijk";
......
...@@ -284,7 +284,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -284,7 +284,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
cqStart(pVnode->cq); cqStart(pVnode->cq);
const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool
pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle); pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle, "qhandle");
pVnode->events = NULL; pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
...@@ -324,7 +324,7 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -324,7 +324,7 @@ void vnodeRelease(void *pVnodeRaw) {
assert(refCount >= 0); assert(refCount >= 0);
if (refCount > 0) { if (refCount > 0) {
vTrace("vgId:%d, release vnode, refCount:%d", vgId, refCount); vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
return; return;
} }
...@@ -388,7 +388,7 @@ void *vnodeAccquireVnode(int32_t vgId) { ...@@ -388,7 +388,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
if (pVnode == NULL) return pVnode; if (pVnode == NULL) return pVnode;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
return pVnode; return pVnode;
} }
...@@ -466,6 +466,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -466,6 +466,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
// release local resources only after cutting off outside connections // release local resources only after cutting off outside connections
taosCacheCleanup(pVnode->qHandlePool);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -78,15 +78,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -78,15 +78,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// this message arrived here by means of the *query* message, so release the vnode is necessary // this message arrived here by means of the *query* message, so release the vnode is necessary
void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle)); void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle));
if (qhandle == NULL || *qhandle == NULL) { // todo handle invalid qhandle error if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else { } else {
// qKillQuery((qinfo_t) killQueryMsg->qhandle);
taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true); taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true);
} }
vnodeRelease(pVnode); vnodeRelease(pVnode);
return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code return TSDB_CODE_TSC_QUERY_CANCELLED;
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -97,8 +96,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -97,8 +96,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->code = code;
pRsp->code = code; pRsp->qhandle = 0;
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
...@@ -120,6 +119,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -120,6 +119,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2); handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2);
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle));
} else { } else {
assert(pQInfo == NULL); assert(pQInfo == NULL);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -128,13 +128,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -128,13 +128,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
pQInfo = pCont; pQInfo = *(void**)(pCont);
handle = pCont;
code = TSDB_CODE_VND_ACTION_IN_PROGRESS; code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo);
} }
if (pQInfo != NULL) { if (pQInfo != NULL) {
qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query qTableQuery(pQInfo); // do execute query
assert(handle != NULL);
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false); taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false);
} }
...@@ -146,52 +150,54 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -146,52 +150,54 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
SRetrieveTableMsg *pRetrieve = pCont; SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void **pQInfo = (void*) htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free); pRetrieve->free = htons(pRetrieve->free);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *pQInfo);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t ret = 0; int32_t ret = 0;
void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, &pQInfo, sizeof(pQInfo)); void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo));
if (handle == NULL || *handle != pQInfo) { if (handle == NULL || handle != pQInfo) {
ret = TSDB_CODE_QRY_INVALID_QHANDLE; ret = TSDB_CODE_QRY_INVALID_QHANDLE;
} }
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); if (ret == TSDB_CODE_SUCCESS) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
taosCacheRelease(pVnode->qHandlePool, handle, true);
// int32_t ret = qKillQuery(pQInfo);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
pRet->len = sizeof(SRetrieveTableRsp); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp; SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0; pRsp->numOfRows = 0;
pRsp->completed = true; pRsp->completed = true;
pRsp->useconds = 0; pRsp->useconds = 0;
} else { // todo handle error
}
return ret; return ret;
} }
vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo);
int32_t code = qRetrieveQueryResultInfo(pQInfo); int32_t code = qRetrieveQueryResultInfo(*pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
//TODO //TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else { } else {
// todo check code and handle error in build result set // todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(pQInfo)) { if (qHasMoreResultsToRetrieve(*pQInfo)) {
pRet->qhandle = pQInfo; pRet->qhandle = handle;
code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
} else { // no further execution invoked, release the ref to vnode } else { // no further execution invoked, release the ref to vnode
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
// qDestroyQueryInfo(pQInfo);
} }
} }
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import threading
import time
from datetime import datetime
import numpy as np
class MyThread(threading.Thread):
def __init__(self, func, args=()):
super(MyThread, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result # 如果子线程不使用join方法,此处可能会报没有self.result的错误
except Exception:
return None
class MetadataQuery:
def initConnection(self):
self.tables = 100
self.records = 10
self.numOfTherads =5
self.ts = 1537146000000
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taos"
self.conn = taos.connect( self.host, self.user, self.password, self.config)
def connectDB(self):
return self.conn.cursor()
def createStable(self):
print("================= Create stable meters =================")
cursor = self.connectDB()
cursor.execute("drop database if exists test")
cursor.execute("create database test")
cursor.execute("use test")
cursor.execute('''create table if not exists meters (ts timestamp, speed int) tags(
tgcol1 tinyint, tgcol2 smallint, tgcol3 int, tgcol4 bigint, tgcol5 float, tgcol6 double, tgcol7 bool, tgcol8 binary(20), tgcol9 nchar(20),
tgcol10 tinyint, tgcol11 smallint, tgcol12 int, tgcol13 bigint, tgcol14 float, tgcol15 double, tgcol16 bool, tgcol17 binary(20), tgcol18 nchar(20),
tgcol19 tinyint, tgcol20 smallint, tgcol21 int, tgcol22 bigint, tgcol23 float, tgcol24 double, tgcol25 bool, tgcol26 binary(20), tgcol27 nchar(20),
tgcol28 tinyint, tgcol29 smallint, tgcol30 int, tgcol31 bigint, tgcol32 float, tgcol33 double, tgcol34 bool, tgcol35 binary(20), tgcol36 nchar(20),
tgcol37 tinyint, tgcol38 smallint, tgcol39 int, tgcol40 bigint, tgcol41 float, tgcol42 double, tgcol43 bool, tgcol44 binary(20), tgcol45 nchar(20),
tgcol46 tinyint, tgcol47 smallint, tgcol48 int, tgcol49 bigint, tgcol50 float, tgcol51 double, tgcol52 bool, tgcol53 binary(20), tgcol54 nchar(20))''')
cursor.close()
def createTablesAndInsertData(self, threadID):
cursor = self.connectDB()
cursor.execute("use test")
base = threadID * self.tables
tablesPerThread = int (self.tables / self.numOfTherads)
for i in range(tablesPerThread):
cursor.execute(
'''create table t%d using meters tags(
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d')''' %
(base + i + 1,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100))
for j in range(self.records):
cursor.execute(
"insert into t%d values(%d, %d)" %
(base + i + 1, self.ts + j, j))
cursor.close()
def queryWithTagId(self, threadId, tagId, queryNum):
print("---------thread%d start-----------"%threadId)
query = '''select tgcol1, tgcol2, tgcol3, tgcol4, tgcol5, tgcol6, tgcol7, tgcol8, tgcol9,
tgcol10, tgcol11, tgcol12, tgcol13, tgcol14, tgcol15, tgcol16, tgcol17, tgcol18,
tgcol19, tgcol20, tgcol21, tgcol22, tgcol23, tgcol24, tgcol25, tgcol26, tgcol27,
tgcol28, tgcol29, tgcol30, tgcol31, tgcol32, tgcol33, tgcol34, tgcol35, tgcol36,
tgcol37, tgcol38, tgcol39, tgcol40, tgcol41, tgcol42, tgcol43, tgcol44, tgcol45,
tgcol46, tgcol47, tgcol48, tgcol49, tgcol50, tgcol51, tgcol52, tgcol53, tgcol54
from meters where tgcol{id} > {condition}'''
latancy = []
cursor = self.connectDB()
cursor.execute("use test")
for i in range(queryNum):
startTime = time.time()
cursor.execute(query.format(id = tagId, condition = i))
cursor.fetchall()
latancy.append((time.time() - startTime))
print("---------thread%d end-----------"%threadId)
return latancy
def queryData(self, query):
cursor = self.connectDB()
cursor.execute("use test")
print("================= query tag data =================")
startTime = datetime.now()
cursor.execute(query)
cursor.fetchall()
endTime = datetime.now()
print(
"Query time for the above query is %d seconds" %
(endTime - startTime).seconds)
cursor.close()
#self.conn.close()
if __name__ == '__main__':
t = MetadataQuery()
t.initConnection()
latancys = []
threads = []
tagId = 1
queryNum = 1000
for i in range(t.numOfTherads):
thread = MyThread(t.queryWithTagId, args = (i, tagId, queryNum))
threads.append(thread)
thread.start()
for i in range(t.numOfTherads):
threads[i].join()
latancys.extend(threads[i].get_result())
print("Total query: %d"%(queryNum * t.numOfTherads))
print("statistic(s): mean= %f, P50 = %f, P75 = %f, P95 = %f, P99 = %f"
%(sum(latancys)/(queryNum * t.numOfTherads), np.percentile(latancys, 50), np.percentile(latancys, 75), np.percentile(latancys, 95), np.percentile(latancys, 99)))
...@@ -114,6 +114,7 @@ echo "mDebugFlag 135" >> $TAOS_CFG ...@@ -114,6 +114,7 @@ echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 135" >> $TAOS_CFG
echo "vDebugFlag 135" >> $TAOS_CFG echo "vDebugFlag 135" >> $TAOS_CFG
echo "tsdbDebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG echo "jnidebugFlag 135" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG echo "odbcdebugFlag 135" >> $TAOS_CFG
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册