提交 6cd8c49b 编写于 作者: H Hui Li 提交者: Shenglian Zhou

Merge pull request #6616 from taosdata/test/TD-4868

add test case to verify taosdump with binary and nchar data
......@@ -898,7 +898,9 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while(1) {
bool prev = *newgroup;
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
*newgroup = prev;
break;
......@@ -966,7 +968,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -974,7 +978,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -986,7 +992,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -1000,7 +1009,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
}
while ((*newgroup) == false) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......
......@@ -825,7 +825,10 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pStatus->index = 0;
if (pStatus->pBlock == NULL) {
......
......@@ -133,6 +133,28 @@ typedef struct STableQueryInfo {
SResultRowInfo resInfo;
} STableQueryInfo;
typedef enum {
QUERY_PROF_BEFORE_OPERATOR_EXEC = 0,
QUERY_PROF_AFTER_OPERATOR_EXEC,
QUERY_PROF_QUERY_ABORT
} EQueryProfEventType;
typedef struct {
EQueryProfEventType eventType;
int64_t eventTime;
union {
uint8_t operatorType; //for operator event
int32_t abortCode; //for query abort event
};
} SQueryProfEvent;
typedef struct {
uint8_t operatorType;
int64_t sumSelfTime;
int64_t sumRunTimes;
} SOperatorProfResult;
typedef struct SQueryCostInfo {
uint64_t loadStatisTime;
uint64_t loadFileBlockTime;
......@@ -154,6 +176,9 @@ typedef struct SQueryCostInfo {
uint64_t tableInfoSize;
uint64_t hashSize;
uint64_t numOfTimeWindows;
SArray* queryProfEvents; //SArray<SQueryProfEvent>
SHashObj* operatorProfResults; //map<operator_type, SQueryProfEvent>
} SQueryCostInfo;
typedef struct {
......@@ -586,7 +611,12 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery);
......
......@@ -3791,6 +3791,83 @@ int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutp
return pOutput->info.rows;
}
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) {
SQueryProfEvent event;
event.eventType = eventType;
event.eventTime = taosGetTimestampUs();
event.operatorType = operatorInfo->operatorType;
SQInfo* qInfo = operatorInfo->pRuntimeEnv->qinfo;
taosArrayPush(qInfo->summary.queryProfEvents, &event);
}
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code) {
SQueryProfEvent event;
event.eventType = QUERY_PROF_QUERY_ABORT;
event.eventTime = taosGetTimestampUs();
event.abortCode = code;
taosArrayPush(pQInfo->summary.queryProfEvents, &event);
}
typedef struct {
uint8_t operatorType;
int64_t beginTime;
int64_t endTime;
int64_t selfTime;
int64_t descendantsTime;
} SOperatorStackItem;
static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* event, SArray* opStack, SHashObj* profResults) {
item->endTime = event->eventTime;
item->selfTime = (item->endTime - item->beginTime) - (item->descendantsTime);
for (int32_t j = 0; j < taosArrayGetSize(opStack); ++j) {
SOperatorStackItem* ancestor = taosArrayGet(opStack, j);
ancestor->descendantsTime += item->selfTime;
}
uint8_t operatorType = item->operatorType;
SOperatorProfResult* result = taosHashGet(profResults, &operatorType, sizeof(operatorType));
if (result != NULL) {
result->sumRunTimes++;
result->sumSelfTime += item->selfTime;
} else {
SOperatorProfResult opResult;
opResult.operatorType = operatorType;
opResult.sumSelfTime = item->selfTime;
opResult.sumRunTimes = 1;
taosHashPut(profResults, &(operatorType), sizeof(operatorType),
&opResult, sizeof(opResult));
}
}
void calculateOperatorProfResults(SQInfo* pQInfo) {
SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem));
size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents);
SHashObj* profResults = pQInfo->summary.operatorProfResults;
for (int i = 0; i < size; ++i) {
SQueryProfEvent* event = taosArrayGet(pQInfo->summary.queryProfEvents, i);
if (event->eventType == QUERY_PROF_BEFORE_OPERATOR_EXEC) {
SOperatorStackItem opItem;
opItem.operatorType = event->operatorType;
opItem.beginTime = event->eventTime;
opItem.descendantsTime = 0;
taosArrayPush(opStack, &opItem);
} else if (event->eventType == QUERY_PROF_AFTER_OPERATOR_EXEC) {
SOperatorStackItem* item = taosArrayPop(opStack);
assert(item->operatorType == event->operatorType);
doOperatorExecProfOnce(item, event, opStack, profResults);
} else if (event->eventType == QUERY_PROF_QUERY_ABORT) {
SOperatorStackItem* item;
while ((item = taosArrayPop(opStack)) != NULL) {
doOperatorExecProfOnce(item, event, opStack, profResults);
}
}
}
}
void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pQInfo->summary;
......@@ -3811,6 +3888,8 @@ void queryCostStatis(SQInfo *pQInfo) {
pSummary->numOfTimeWindows = 0;
}
calculateOperatorProfResults(pQInfo);
qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
......@@ -3818,6 +3897,13 @@ void queryCostStatis(SQInfo *pQInfo) {
qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
SOperatorProfResult* opRes = taosHashIterate(pSummary->operatorProfResults, NULL);
while (opRes != NULL) {
qDebug("QInfo:0x%"PRIx64" :cost summary: operator : %d, exec times: %"PRId64", self time: %"PRId64, pQInfo->qId,
opRes->operatorType, opRes->sumRunTimes, opRes->sumSelfTime );
opRes = taosHashIterate(pSummary->operatorProfResults, opRes);
}
}
//static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
......@@ -4219,6 +4305,9 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
// create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
pQInfo->summary.queryProfEvents = taosArrayInit(1024, sizeof(SQueryProfEvent));
pQInfo->summary.operatorProfResults =
taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK);
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4843,7 +4932,10 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -4898,7 +4990,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -4978,7 +5073,10 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
bool prevVal = *newgroup;
// The upstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
assert(*newgroup == false);
......@@ -5038,7 +5136,10 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while (1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -5088,7 +5189,10 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5133,7 +5237,10 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5186,7 +5293,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5314,7 +5424,10 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
while (1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5372,7 +5485,9 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5423,7 +5538,9 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5489,7 +5606,10 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
}
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) {
assert(pBlock != NULL);
}
......@@ -6159,7 +6279,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -7483,6 +7606,9 @@ void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo->pBuf);
tfree(pQInfo->sql);
taosArrayDestroy(pQInfo->summary.queryProfEvents);
taosHashCleanup(pQInfo->summary.operatorProfResults);
taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows);
pQInfo->signature = 0;
......
......@@ -232,6 +232,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
......@@ -240,7 +241,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
......
......@@ -46,6 +46,8 @@ class TDTestCase:
sql += "(%d, %d, 'nchar%d')" % (currts + i, i % 100, i % 100)
tdSql.execute(sql)
os.system("rm /tmp/*.sql")
os.system("taosdump --databases db -o /tmp")
tdSql.execute("drop database db")
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1601481600000
self.numberOfTables = 1
self.numberOfRecords = 15000
def run(self):
tdSql.prepare()
tdSql.execute("create table st(ts timestamp, c1 timestamp, c2 int, c3 bigint, c4 float, c5 double, c6 binary(8), c7 smallint, c8 tinyint, c9 bool, c10 nchar(8)) tags(t1 int)")
tdSql.execute("create table t1 using st tags(0)")
currts = self.ts
finish = 0
while(finish < self.numberOfRecords):
sql = "insert into t1 values"
for i in range(finish, self.numberOfRecords):
sql += "(%d, 1019774612, 29931, 1442173978, 165092.468750, 1128.643179, 'MOCq1pTu', 18405, 82, 0, 'g0A6S0Fu')" % (currts + i)
finish = i + 1
if (1048576 - len(sql)) < 16384:
break
tdSql.execute(sql)
os.system("rm /tmp/*.sql")
os.system("taosdump --databases db -o /tmp -B 32766 -L 1048576")
tdSql.execute("drop database db")
tdSql.query("show databases")
tdSql.checkRows(0)
os.system("taosdump -i /tmp")
tdSql.query("show databases")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'db')
tdSql.execute("use db")
tdSql.query("show stables")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'st')
tdSql.query("select count(*) from t1")
tdSql.checkData(0, 0, self.numberOfRecords)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册