提交 505cbcb7 编写于 作者: P Ping Xiao

TD-780: performance testing for metadata query

......@@ -84,7 +84,7 @@ typedef struct SRetrieveSupport {
SColumnModel * pFinalColModel; // colModel for final result
SSubqueryState * pState;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSqlObj;
SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
pthread_mutex_t queryMutex;
......
......@@ -36,6 +36,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql);
void **doSetResultRowData(SSqlObj *pSql, bool finalResult);
......
......@@ -213,8 +213,7 @@ typedef struct SQueryInfo {
typedef struct {
int command;
uint8_t msgType;
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
union {
int32_t count;
......@@ -222,18 +221,23 @@ typedef struct {
};
int32_t insertType;
int32_t clauseIndex; // index of multiple subclause query
int32_t clauseIndex; // index of multiple subclause query
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
short numOfCols;
uint32_t allocSize;
char * payload;
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
int8_t submitSchema; // submit block is built with table schema
SHashObj *pTableList; // referred table involved in sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
} SSqlCmd;
......
......@@ -431,6 +431,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return;
}
tscDebug("%p get tableMeta successfully", pSql);
if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......@@ -446,20 +448,20 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(code == TSDB_CODE_SUCCESS);
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSqlObj;
SSqlObj * pParObj = trs->pParentSql;
// NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
// NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pTableMetaInfo->vgroupList != NULL);
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
goto _error;
} else { // continue to process normal async query
if (pCmd->parseFinished) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
......@@ -472,18 +474,41 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(code == TSDB_CODE_SUCCESS);
}
// if failed to process sql, go to error handler
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return;
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated
// 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT) {
tscDebug("%p redo parse sql string to build submit block", pSql);
pCmd->parseFinished = false;
if ((code = tsParseSql(pSql, true)) == TSDB_CODE_SUCCESS) {
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql->fp = pSql->fetchFp; // restore the fp
if ((code = tscHandleInsertRetry(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
}
} else {// in case of other query type, continue
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
}
// // todo update the submit message according to the new table meta
// // 1. table uid, 2. ip address
// code = tscSendMsgToServer(pSql);
// if (code == TSDB_CODE_SUCCESS) return;
goto _error;
} else {
tscDebug("%p continue parse sql after get table meta", pSql);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -492,45 +517,49 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} else {
assert(code == TSDB_CODE_SUCCESS);
}
(*pSql->fp)(pSql->param, pSql, code);
return;
}
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
// proceed to invoke the tscDoQuery();
}
}
} else { // stream computing
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
pRes->code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
}
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
return;
}
if (pSql->pStream) {
tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) {
tsParseSql(pSql, false);
sem_post(&pSql->rspSem);
}
return;
} else {
tscDebug("%p get tableMeta successfully", pSql);
}
tscDoQuery(pSql);
return;
_error:
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
}
}
......@@ -1259,8 +1259,6 @@ int tsParseInsertSql(SSqlObj *pSql) {
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error;
}
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
}
code = TSDB_CODE_SUCCESS;
......
......@@ -347,8 +347,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
int doProcessSql(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
int32_t code = TSDB_CODE_SUCCESS;
if (pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_RETRIEVE ||
......@@ -365,10 +364,13 @@ int doProcessSql(SSqlObj *pSql) {
return pRes->code;
}
code = tscSendMsgToServer(pSql);
int32_t code = tscSendMsgToServer(pSql);
// NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
if (code != TSDB_CODE_SUCCESS) {
pRes->code = code;
tscQueueAsyncRes(pSql);
return pRes->code;
}
return TSDB_CODE_SUCCESS;
......
此差异已折叠。
......@@ -562,10 +562,8 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
return TSDB_CODE_SUCCESS;
}
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) {
// TODO: optimize this function, handle the case while binary is not presented
int len = 0;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
......@@ -575,16 +573,37 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
pDataBlock += sizeof(SSubmitBlk);
int32_t flen = 0; // original total length of row
for (int32_t i = 0; i < tinfo.numOfColumns; ++i) {
flen += TYPE_BYTES[pSchema[i].type];
// schema needs to be included into the submit data block
if (includeSchema) {
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
for(int32_t j = 0; j < numOfCols; ++j) {
STColumn* pCol = (STColumn*) pDataBlock;
pCol->colId = pSchema[j].colId;
pCol->type = pSchema[j].type;
pCol->bytes = pSchema[j].bytes;
pCol->offset = 0;
pDataBlock += sizeof(STColumn);
flen += TYPE_BYTES[pSchema[j].type];
}
int32_t schemaSize = sizeof(STColumn) * numOfCols;
pBlock->schemaLen = schemaSize;
} else {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
pBlock->schemaLen = 0;
}
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
pBlock->len = 0;
pBlock->dataLen = 0;
int32_t numOfRows = htons(pBlock->numOfRows);
for (int32_t i = 0; i < numOfRows; ++i) {
SDataRow trow = (SDataRow)pDataBlock;
SDataRow trow = (SDataRow) pDataBlock;
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
dataRowSetVersion(trow, pTableMeta->sversion);
......@@ -595,20 +614,21 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
p += pSchema[j].bytes;
}
// p += pTableDataBlock->rowSize;
pDataBlock += dataRowLen(trow);
pBlock->len += dataRowLen(trow);
pBlock->dataLen += dataRowLen(trow);
}
len = pBlock->len;
pBlock->len = htonl(pBlock->len);
int32_t len = pBlock->dataLen + pBlock->schemaLen;
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
return len;
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd;
// the expanded size when a row data is converted to SDataRow format
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
......@@ -617,7 +637,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
size_t total = taosArrayGetSize(pTableDataBlockList);
for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
STableDataBlocks* dataBuf = NULL;
int32_t ret =
......@@ -666,16 +685,17 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows);
pBlocks->schemaLen = 0;
// erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema);
assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize);
// the length does not include the SSubmitBlk structure
pBlocks->len = htonl(finalLen);
pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1;
}
......
......@@ -128,10 +128,10 @@ extern float tsTotalLogDirGB;
extern float tsTotalTmpDirGB;
extern float tsTotalDataDirGB;
extern float tsAvailLogDirGB;
extern float tsAvailTmpDirGB;
extern float tsAvailTmpDirectorySpace;
extern float tsAvailDataDirGB;
extern float tsMinimalLogDirGB;
extern float tsMinimalTmpDirGB;
extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB;
extern int32_t tsTotalMemoryMB;
extern int32_t tsVersion;
......
......@@ -170,9 +170,9 @@ int64_t tsStreamMax;
int32_t tsNumOfCores = 1;
float tsTotalTmpDirGB = 0;
float tsTotalDataDirGB = 0;
float tsAvailTmpDirGB = 0;
float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0;
float tsMinimalTmpDirGB = 0.1;
float tsReservedTmpDirectorySpace = 0.1;
float tsMinimalDataDirGB = 0.5;
int32_t tsTotalMemoryMB = 0;
int32_t tsVersion = 0;
......@@ -807,7 +807,7 @@ static void doInitGlobalConfig() {
taosInitConfigOption(cfg);
cfg.option = "minimalTmpDirGB";
cfg.ptr = &tsMinimalTmpDirGB;
cfg.ptr = &tsReservedTmpDirectorySpace;
cfg.valType = TAOS_CFG_VTYPE_FLOAT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0.001;
......
package com.taosdata.jdbc.utils;
public class TDNode {
private int index;
private int running;
private int deployed;
private boolean testCluster;
private int valgrind;
private String path;
public TDNode(int index) {
this.index = index;
running = 0;
deployed = 0;
testCluster = false;
valgrind = 0;
}
public void setTestCluster(boolean testCluster) {
this.testCluster = testCluster;
}
public void setValgrind(boolean valgrind) {
this.valgrind = valgrind;
}
public int getDataSize() {
totalSize = 0;
if(deployed == 1) {
}
}
def getDataSize(self):
totalSize = 0
if (self.deployed == 1):
for dirpath, dirnames, filenames in .walk(self.dataDir):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
totalSize = totalSize + os.path.getsize(fp)
return totalSize
}
\ No newline at end of file
......@@ -283,7 +283,8 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
}
pBlk->len = htonl(dataRowLen(trow));
pBlk->dataLen = htonl(dataRowLen(trow));
pBlk->schemaLen = 0;
pBlk->uid = htobe64(pObj->uid);
pBlk->tid = htonl(pObj->tid);
......
......@@ -153,6 +153,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, 0, 0x0369, "mnode tag
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, 0, 0x036A, "mnode tag not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_ALREAY_EXIST, 0, 0x036B, "mnode field already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_NOT_EXIST, 0, 0x036C, "mnode field not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STABLE_NAME, 0, 0x036D, "mnode invalid stable name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, 0, 0x0380, "mnode db not selected")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, 0, 0x0381, "mnode database aleady exist")
......
......@@ -192,7 +192,8 @@ typedef struct SSubmitBlk {
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t len; // data part length, not including the SSubmitBlk head
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
......
......@@ -382,11 +382,13 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt
pStable->numOfTables++;
if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(100000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
}
if (pStable->vgHash != NULL) {
taosHashPut(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
}
}
}
......@@ -1964,9 +1966,15 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STagData *pTag = (STagData *)pInfo->tags;
STagData *pTags = (STagData *)pInfo->tags;
int32_t tagLen = htonl(pTags->dataLen);
if (pTags->name[0] == 0) {
mError("app:%p:%p, table:%s, failed to create table on demand for stable is empty, tagLen:%d", pMsg->rpcMsg.ahandle,
pMsg, pInfo->tableId, tagLen);
return TSDB_CODE_MND_INVALID_STABLE_NAME;
}
int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + htonl(pTag->dataLen);
int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + tagLen;
SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
if (pCreateMsg == NULL) {
mError("app:%p:%p, table:%s, failed to create table while get meta info, no enough memory", pMsg->rpcMsg.ahandle,
......@@ -1981,9 +1989,9 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
pCreateMsg->getMeta = 1;
pCreateMsg->contLen = htonl(contLen);
memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg));
mDebug("app:%p:%p, table:%s, start to create on demand, stable:%s", pMsg->rpcMsg.ahandle, pMsg, pInfo->tableId,
((STagData *)(pCreateMsg->schema))->name);
memcpy(pCreateMsg->schema, pTags, contLen - sizeof(SCMCreateTableMsg));
mDebug("app:%p:%p, table:%s, start to create on demand, tagLen:%d stable:%s",
pMsg->rpcMsg.ahandle, pMsg, pInfo->tableId, tagLen, pTags->name);
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
......
......@@ -326,12 +326,12 @@ bool taosGetDisk() {
if (statvfs("/tmp", &info)) {
//tsTotalTmpDirGB = 0;
//tsAvailTmpDirGB = 0;
//tsAvailTmpDirectorySpace = 0;
uError("failed to get disk size, tmpDir:/tmp errno:%s", strerror(errno));
return false;
} else {
tsTotalTmpDirGB = (float)((double)info.f_blocks * (double)info.f_frsize / unit);
tsAvailTmpDirGB = (float)((double)info.f_bavail * (double)info.f_frsize / unit);
tsAvailTmpDirectorySpace = (float)((double)info.f_bavail * (double)info.f_frsize / unit);
}
return true;
......
......@@ -359,6 +359,8 @@ void httpExecCmd(HttpContext *pContext) {
void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = param;
taos_free_result(result);
if (pContext == NULL) return;
if (code < 0) {
......
......@@ -161,12 +161,12 @@ typedef struct SQuery {
} SQuery;
typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not
uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo;
SWindowResInfo windowResInfo;
STSBuf* pTSBuf;
......@@ -176,7 +176,8 @@ typedef struct SQueryRuntimeEnv {
void* pQueryHandle;
void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool topBotQuery; // false;
bool topBotQuery; // false
int32_t prevGroupId; // previous executed group id
} SQueryRuntimeEnv;
typedef struct SQInfo {
......
......@@ -3270,6 +3270,8 @@ static bool hasMainOutput(SQuery *pQuery) {
}
static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win) {
SQuery* pQuery = pRuntimeEnv->pQuery;
STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = win;
......@@ -3278,7 +3280,15 @@ static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, voi
pTableQueryInfo->pTable = pTable;
pTableQueryInfo->cur.vgroupIndex = -1;
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT);
int32_t initialSize = 1;
int32_t initialThreshold = 1;
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
initialSize = 20;
initialThreshold = 100;
}
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
return pTableQueryInfo;
}
......@@ -3305,13 +3315,20 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
* @param pRuntimeEnv
* @param pDataBlockInfo
*/
void setExecutionContext(SQInfo *pQInfo, void* pTable, int32_t groupIndex, TSKEY nextKey) {
void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
int32_t GROUPRESULTID = 1;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
return;
}
int32_t GROUPRESULTID = 1;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex));
if (pWindowRes == NULL) {
return;
......@@ -3328,11 +3345,10 @@ void setExecutionContext(SQInfo *pQInfo, void* pTable, int32_t groupIndex, TSKEY
}
}
// record the current active group id
pRuntimeEnv->prevGroupId = groupIndex;
setWindowResOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv);
pTableQueryInfo->lastKey = nextKey;
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
}
void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) {
......@@ -4072,6 +4088,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vgroupIndex = -1;
pRuntimeEnv->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
if (param != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
......@@ -4176,8 +4193,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (!isIntervalQuery(pQuery)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, (*pTableQueryInfo)->pTable, (*pTableQueryInfo)->groupIndex,
blockInfo.window.ekey + step);
setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
} else { // interval query
TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey);
......@@ -4553,7 +4569,8 @@ static void doSaveContext(SQInfo *pQInfo) {
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
......
......@@ -631,5 +631,5 @@ void exprSerializeTest2() {
}
} // namespace
TEST(testCase, astTest) {
exprSerializeTest2();
// exprSerializeTest2();
}
\ No newline at end of file
......@@ -768,7 +768,8 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
SSubmitBlk *pBlock = pIter->pBlock;
if (pBlock == NULL) return NULL;
pBlock->len = htonl(pBlock->len);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
......@@ -776,11 +777,11 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding);
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len;
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen;
if (pIter->len >= pIter->totalLen) {
pIter->pBlock = NULL;
} else {
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->len + sizeof(SSubmitBlk));
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk));
}
return pBlock;
......@@ -832,10 +833,10 @@ _err:
}
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->len <= 0) return -1;
pIter->totalLen = pBlock->len;
if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen;
pIter->len = 0;
pIter->row = (SDataRow)(pBlock->data);
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
return 0;
}
......
#!/bin/bash
# Coloured Echoes
function red_echo { echo -e "\033[31m$@\033[0m"; }
function green_echo { echo -e "\033[32m$@\033[0m"; }
function yellow_echo { echo -e "\033[33m$@\033[0m"; }
function white_echo { echo -e "\033[1;37m$@\033[0m"; }
# Coloured Printfs
function red_printf { printf "\033[31m$@\033[0m"; }
function green_printf { printf "\033[32m$@\033[0m"; }
function yellow_printf { printf "\033[33m$@\033[0m"; }
function white_printf { printf "\033[1;37m$@\033[0m"; }
# Debugging Outputs
function white_brackets { local args="$@"; white_printf "["; printf "${args}"; white_printf "]"; }
function echoInfo { local args="$@"; white_brackets $(green_printf "INFO") && echo " ${args}"; }
function echoWarn { local args="$@"; echo "$(white_brackets "$(yellow_printf "WARN")" && echo " ${args}";)" 1>&2; }
function echoError { local args="$@"; echo "$(white_brackets "$(red_printf "ERROR")" && echo " ${args}";)" 1>&2; }
function restartTaosd {
systemctl stop taosd
pkill -KILL -x taosd
sleep 10
rm -rf /mnt/var/log/taos/*
rm -rf /mnt/var/lib/taos/*
taosd 2>&1 > /dev/null &
sleep 10
}
function runCreateTableThenInsert {
echoInfo "Restart Taosd"
restartTaosd
/usr/bin/time -f "Total: %e" -o totaltime.out bash -c "yes | taosdemo 2>&1 | tee -a taosdemo-$1-$today.log"
demoTableAndInsert=`grep "Total:" totaltime.out|awk '{print $2}'`
demoRPS=`grep "records\/second" taosdemo-$1-$today.log | tail -n1 | awk '{print $13}'`
}
function queryMetadata {
echo "query metadata"
cd ../pytest/query
python3 queryMetaData.py | tee -a queryResult.log
}
\ No newline at end of file
......@@ -17,16 +17,17 @@ import threading
import time
from datetime import datetime
class MetadataQuery:
def initConnection(self):
self.tables = 10000
self.tables = 100000
self.records = 10
self.numOfTherads = 10
self.ts = 1537146000000
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taos"
self.config = "/etc/taos"
def connectDB(self):
self.conn = taos.connect(
......@@ -35,7 +36,7 @@ class MetadataQuery:
self.password,
self.config)
return self.conn.cursor()
def createStable(self):
print("================= Create stable meters =================")
cursor = self.connectDB()
......@@ -43,78 +44,88 @@ class MetadataQuery:
cursor.execute("create database test")
cursor.execute("use test")
cursor.execute('''create table if not exists meters (ts timestamp, speed int) tags(
tgcol1 tinyint, tgcol2 smallint, tgcol3 int, tgcol4 bigint, tgcol5 float, tgcol6 double, tgcol7 bool, tgcol8 binary(20), tgcol9 nchar(20),
tgcol10 tinyint, tgcol11 smallint, tgcol12 int, tgcol13 bigint, tgcol14 float, tgcol15 double, tgcol16 bool, tgcol17 binary(20), tgcol18 nchar(20),
tgcol19 tinyint, tgcol20 smallint, tgcol21 int, tgcol22 bigint, tgcol23 float, tgcol24 double, tgcol25 bool, tgcol26 binary(20), tgcol27 nchar(20),
tgcol28 tinyint, tgcol29 smallint, tgcol30 int, tgcol31 bigint, tgcol32 float, tgcol33 double, tgcol34 bool, tgcol35 binary(20), tgcol36 nchar(20),
tgcol37 tinyint, tgcol38 smallint, tgcol39 int, tgcol40 bigint, tgcol41 float, tgcol42 double, tgcol43 bool, tgcol44 binary(20), tgcol45 nchar(20),
tgcol1 tinyint, tgcol2 smallint, tgcol3 int, tgcol4 bigint, tgcol5 float, tgcol6 double, tgcol7 bool, tgcol8 binary(20), tgcol9 nchar(20),
tgcol10 tinyint, tgcol11 smallint, tgcol12 int, tgcol13 bigint, tgcol14 float, tgcol15 double, tgcol16 bool, tgcol17 binary(20), tgcol18 nchar(20),
tgcol19 tinyint, tgcol20 smallint, tgcol21 int, tgcol22 bigint, tgcol23 float, tgcol24 double, tgcol25 bool, tgcol26 binary(20), tgcol27 nchar(20),
tgcol28 tinyint, tgcol29 smallint, tgcol30 int, tgcol31 bigint, tgcol32 float, tgcol33 double, tgcol34 bool, tgcol35 binary(20), tgcol36 nchar(20),
tgcol37 tinyint, tgcol38 smallint, tgcol39 int, tgcol40 bigint, tgcol41 float, tgcol42 double, tgcol43 bool, tgcol44 binary(20), tgcol45 nchar(20),
tgcol46 tinyint, tgcol47 smallint, tgcol48 int, tgcol49 bigint, tgcol50 float, tgcol51 double, tgcol52 bool, tgcol53 binary(20), tgcol54 nchar(20))''')
cursor.close()
self.conn.close()
def queryData(self, q):
print("================= query tag data =================")
def createTablesAndInsertData(self, threadID):
cursor = self.connectDB()
cursor.execute("use test")
startTime = datetime.now()
cursor.execute(q)
cursor.fetchall()
endTime = datetime.now()
print("Query time for the above query is %d seconds" % (endTime - startTime).seconds)
cursor.execute("use test")
base = threadID * self.tables
tablesPerThread = (int) self.tables / self.numOfTherads
for i in range(tablesPerThread):
cursor.execute(
'''create table t%d using meters tags(
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d')''' %
(base + i + 1,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100,
(base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100))
for j in range(self.records):
cursor.execute(
"insert into t%d values(%d, %d)" %
(base + i + 1, self.ts + j, j))
cursor.close()
self.conn.close()
def createTablesAndInsertData(self, threadID):
def queryData(self, query):
cursor = self.connectDB()
cursor.execute("use test")
base = threadID * self.tables
for i in range(self.tables):
cursor.execute('''create table t%d using meters tags(
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d',
%d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d')'''
% (base + i + 1,
(base + i) % 100, (base + i) % 10000, (base + i) % 1000000, (base + i) % 100000000, (base + i) % 100 * 1.1, (base + i) % 100 * 2.3, (base + i) % 2, (base + i) % 100, (base + i) % 100,
(base + i) % 100, (base + i) % 10000, (base + i) % 1000000, (base + i) % 100000000, (base + i) % 100 * 1.1, (base + i) % 100 * 2.3, (base + i) % 2, (base + i) % 100, (base + i) % 100,
(base + i) % 100, (base + i) % 10000, (base + i) % 1000000, (base + i) % 100000000, (base + i) % 100 * 1.1, (base + i) % 100 * 2.3, (base + i) % 2, (base + i) % 100, (base + i) % 100,
(base + i) % 100, (base + i) % 10000, (base + i) % 1000000, (base + i) % 100000000, (base + i) % 100 * 1.1, (base + i) % 100 * 2.3, (base + i) % 2, (base + i) % 100, (base + i) % 100,
(base + i) % 100, (base + i) % 10000, (base + i) % 1000000, (base + i) % 100000000, (base + i) % 100 * 1.1, (base + i) % 100 * 2.3, (base + i) % 2, (base + i) % 100, (base + i) % 100,
(base + i) % 100, (base + i) % 10000, (base + i) % 1000000, (base + i) % 100000000, (base + i) % 100 * 1.1, (base + i) % 100 * 2.3, (base + i) % 2, (base + i) % 100, (base + i) % 100 ))
for j in range(self.records):
cursor.execute("insert into t%d values(%d, %d)" % (base + i + 1, self.ts + j, j))
cursor.execute("use test")
print("================= query tag data =================")
startTime = datetime.now()
cursor.execute(query)
cursor.fetchall()
endTime = datetime.now()
print(
"Query time for the above query is %d seconds" %
(endTime - startTime).seconds)
cursor.close()
self.conn.close()
if __name__ == '__main__':
t = MetadataQuery()
t.initConnection()
t.createStable()
print("================= Create %d tables and insert %d records into each table =================" % (t.tables, t.records))
print(
"================= Create %d tables and insert %d records into each table =================" %
(t.tables, t.records))
startTime = datetime.now()
for i in range(t.numOfTherads):
thread = threading.Thread(target=t.createTablesAndInsertData, args=(i,))
thread.start()
thread = threading.Thread(
target=t.createTablesAndInsertData, args=(i,))
thread.start()
thread.join()
endTime = datetime.now()
diff = (endTime - startTime).seconds
print("spend %d seconds to create %d tables and insert %d records into each table" % (diff, t.tables, t.records));
# tgcol28, tgcol29, tgcol30, tgcol31, tgcol32, tgcol33, tgcol34, tgcol35, tgcol36,
# tgcol37, tgcol38, tgcol39, tgcol40, tgcol41, tgcol42, tgcol43, tgcol44, tgcol45,
# tgcol46, tgcol47, tgcol48, tgcol49, tgcol50, tgcol51, tgcol52, tgcol53, tgcol54
# tgcol19, tgcol20, tgcol21, tgcol22, tgcol23, tgcol24, tgcol25, tgcol26, tgcol27,
print(
"spend %d seconds to create %d tables and insert %d records into each table" %
(diff, t.tables, t.records))
query = '''select tgcol1, tgcol2, tgcol3, tgcol4, tgcol5, tgcol6, tgcol7, tgcol8, tgcol9,
tgcol10, tgcol11, tgcol12, tgcol13, tgcol14, tgcol15, tgcol16, tgcol17, tgcol18,
tgcol10, tgcol11, tgcol12, tgcol13, tgcol14, tgcol15, tgcol16, tgcol17, tgcol18,
tgcol19, tgcol20, tgcol21, tgcol22, tgcol23, tgcol24, tgcol25, tgcol26, tgcol27,
tgcol28, tgcol29, tgcol30, tgcol31, tgcol32, tgcol33, tgcol34, tgcol35, tgcol36,
tgcol37, tgcol38, tgcol39, tgcol40, tgcol41, tgcol42, tgcol43, tgcol44, tgcol45,
tgcol46, tgcol47, tgcol48, tgcol49, tgcol50, tgcol51, tgcol52, tgcol53, tgcol54
from meters where tgcol1 > 10 AND tgcol1 < 100 and tgcol2 > 100 and tgcol2 < 1000 or tgcol3 > 10000 or tgcol7 = true
or tgcol8 like '%2' and tgcol10 < 10'''
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册