提交 91a837ed 编写于 作者: G gccgdb1234

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -42,7 +42,7 @@ To do so, run the following command: ...@@ -42,7 +42,7 @@ To do so, run the following command:
``` ```
This command creates the `meters` supertable in the `test` database. In the `meters` supertable, it then creates 10,000 subtables named `d0` to `d9999`. Each table has 10,000 rows and each row has four columns: `ts`, `current`, `voltage`, and `phase`. The timestamps of the data in these columns range from 2017-07-14 10:40:00 000 to 2017-07-14 10:40:09 999. Each table is randomly assigned a `groupId` tag from 1 to ten and a `location` tag of either `California.SanFrancisco` or `California.SanDiego`. This command creates the `meters` supertable in the `test` database. In the `meters` supertable, it then creates 10,000 subtables named `d0` to `d9999`. Each table has 10,000 rows and each row has four columns: `ts`, `current`, `voltage`, and `phase`. The timestamps of the data in these columns range from 2017-07-14 10:40:00 000 to 2017-07-14 10:40:09 999. Each table is randomly assigned a `groupId` tag from 1 to 10 and a `location` tag of either `Campbell`, `Cupertino`, `Los Angeles`, `Mountain View`, `Palo Alto`, `San Diego`, `San Francisco`, `San Jose`, `Santa Clara` or `Sunnyvale`.
The `taosBenchmark` command creates a deployment with 100 million data points that you can use for testing purposes. The time required depends on the hardware specifications of the local system. The `taosBenchmark` command creates a deployment with 100 million data points that you can use for testing purposes. The time required depends on the hardware specifications of the local system.
......
...@@ -96,7 +96,12 @@ typedef struct { ...@@ -96,7 +96,12 @@ typedef struct {
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp, us int64_t start; // start timestamp, us
int64_t parsed; // start to parse, us int64_t syntaxStart; // start to parse, us
int64_t syntaxEnd; // end to parse, us
int64_t ctgStart; // start to parse, us
int64_t ctgEnd; // end to parse, us
int64_t semanticEnd;
int64_t execEnd;
int64_t send; // start to send to server, us int64_t send; // start to send to server, us
int64_t rsp; // receive response from server, us int64_t rsp; // receive response from server, us
} SQueryExecMetric; } SQueryExecMetric;
......
...@@ -29,6 +29,7 @@ extern "C" { ...@@ -29,6 +29,7 @@ extern "C" {
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0) #define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscPerf(...) do { taosPrintLog("TSC ", 0, cDebugFlag, __VA_ARGS__); } while(0)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -69,14 +69,25 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -69,14 +69,25 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampUs() - pRequest->metric.start; int64_t nowUs = taosGetTimestampUs();
int64_t duration = nowUs - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d", " ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart,
pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart,
pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
} }
...@@ -330,7 +341,6 @@ void doDestroyRequest(void *p) { ...@@ -330,7 +341,6 @@ void doDestroyRequest(void *p) {
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pDb); taosMemoryFreeClear(pRequest->pDb);
doFreeReqResultInfo(&pRequest->body.resInfo); doFreeReqResultInfo(&pRequest->body.resInfo);
...@@ -349,6 +359,7 @@ void doDestroyRequest(void *p) { ...@@ -349,6 +359,7 @@ void doDestroyRequest(void *p) {
taosMemoryFree(pRequest->body.param); taosMemoryFree(pRequest->body.param);
} }
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFree(pRequest); taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
} }
......
...@@ -842,6 +842,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { ...@@ -842,6 +842,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
} }
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
pRequest->metric.execEnd = taosGetTimestampUs();
} }
taosMemoryFree(pResult); taosMemoryFree(pResult);
......
...@@ -685,6 +685,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { ...@@ -685,6 +685,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SQuery *pQuery = pWrapper->pQuery; SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
pRequest->metric.ctgEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery; pRequest->stableQuery = pQuery->stableQuery;
...@@ -693,6 +695,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { ...@@ -693,6 +695,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
} }
} }
pRequest->metric.semanticEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (pQuery->haveResultSet) { if (pQuery->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
...@@ -784,12 +788,16 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -784,12 +788,16 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SQuery *pQuery = NULL; SQuery *pQuery = NULL;
pRequest->metric.syntaxStart = taosGetTimestampUs();
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)}; SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq); code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pRequest->metric.syntaxEnd = taosGetTimestampUs();
if (!updateMetaForce) { if (!updateMetaForce) {
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
...@@ -816,6 +824,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -816,6 +824,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
.requestObjRefId = pCxt->requestRid, .requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet}; .mgmtEps = pCxt->mgmtEpSet};
pRequest->metric.ctgStart = taosGetTimestampUs();
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob); &pRequest->body.queryJob);
pCxt = NULL; pCxt = NULL;
......
...@@ -1416,7 +1416,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf ...@@ -1416,7 +1416,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
int64_t minKey = 0; int64_t minKey = 0;
if (pReader->order == TSDB_ORDER_ASC) { if (pReader->order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) { if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
minKey = tsLast; minKey = tsLast;
} }
...@@ -1429,7 +1429,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf ...@@ -1429,7 +1429,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
} }
} else { } else {
minKey = INT64_MIN; minKey = INT64_MIN;
if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) { if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
minKey = tsLast; minKey = tsLast;
} }
......
...@@ -469,6 +469,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray ...@@ -469,6 +469,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
code = createResultData(&type, rows, &output); code = createResultData(&type, rows, &output);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qError("failed to create result, reason:%s", tstrerror(code)); qError("failed to create result, reason:%s", tstrerror(code));
goto end; goto end;
} }
...@@ -477,6 +478,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray ...@@ -477,6 +478,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
qError("failed to calculate scalar, reason:%s", tstrerror(code)); qError("failed to calculate scalar, reason:%s", tstrerror(code));
terrno = code; terrno = code;
goto end;
} }
// int64_t st2 = taosGetTimestampUs(); // int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); // qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
...@@ -763,6 +765,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, ...@@ -763,6 +765,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
} }
if (pTagCond) { if (pTagCond) {
terrno = TDB_CODE_SUCCESS;
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond); SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond);
if(terrno != TDB_CODE_SUCCESS){ if(terrno != TDB_CODE_SUCCESS){
colDataDestroy(pColInfoData); colDataDestroy(pColInfoData);
......
...@@ -283,7 +283,7 @@ typedef struct SSchJob { ...@@ -283,7 +283,7 @@ typedef struct SSchJob {
} SSchJob; } SSchJob;
typedef struct SSchTaskCtx { typedef struct SSchTaskCtx {
SSchJob *pJob; int64_t jobRid;
SSchTask *pTask; SSchTask *pTask;
} SSchTaskCtx; } SSchTaskCtx;
......
...@@ -821,7 +821,13 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { ...@@ -821,7 +821,13 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
int32_t schLaunchTaskImpl(void *param) { int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param; SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = pCtx->pJob; SSchJob *pJob = schAcquireJob(pCtx->jobRid);
if (NULL == pJob) {
taosMemoryFree(param);
qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
}
SSchTask *pTask = pCtx->pTask; SSchTask *pTask = pCtx->pTask;
int8_t status = 0; int8_t status = 0;
int32_t code = 0; int32_t code = 0;
...@@ -880,6 +886,8 @@ _return: ...@@ -880,6 +886,8 @@ _return:
} }
} }
schReleaseJob(pJob->refId);
SCH_RET(code); SCH_RET(code);
} }
...@@ -890,7 +898,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { ...@@ -890,7 +898,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
param->pJob = pJob; param->jobRid = pJob->refId;
param->pTask = pTask; param->pTask = pTask;
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册