提交 52457838 编写于 作者: H Haojun Liao

[td-11818] add data into sink node

上级 a2545549
...@@ -21,6 +21,9 @@ extern "C" { ...@@ -21,6 +21,9 @@ extern "C" {
#endif #endif
typedef void* qTaskInfo_t; typedef void* qTaskInfo_t;
typedef void* DataSinkHandle;
struct SSubplan;
/** /**
* Create the exec task object according to task json * Create the exec task object according to task json
...@@ -34,13 +37,14 @@ typedef void* qTaskInfo_t; ...@@ -34,13 +37,14 @@ typedef void* qTaskInfo_t;
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo); int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo);
/** /**
* the main task execution function, including query on both table and multiple tables, * The main task execution function, including query on both table and multiple tables,
* which are decided according to the tag or table name query conditions * which are decided according to the tag or table name query conditions
* *
* @param qinfo * @param tinfo
* @param handle
* @return * @return
*/ */
bool qExecTask(qTaskInfo_t qTask, SSDataBlock** pRes); int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle);
/** /**
* Retrieve the produced results information, if current query is not paused or completed, * Retrieve the produced results information, if current query is not paused or completed,
...@@ -62,7 +66,7 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo ...@@ -62,7 +66,7 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo
* @param contLen payload length * @param contLen payload length
* @return * @return
*/ */
int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); //int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/** /**
* return the transporter context (RPC) * return the transporter context (RPC)
......
...@@ -21,6 +21,7 @@ extern "C" { ...@@ -21,6 +21,7 @@ extern "C" {
#endif #endif
#include "os.h" #include "os.h"
#include "executor.h"
#include "executorimpl.h" #include "executorimpl.h"
#define DS_CAPACITY_ENOUGH 1 #define DS_CAPACITY_ENOUGH 1
...@@ -39,8 +40,6 @@ typedef struct SDataSinkMgtCfg { ...@@ -39,8 +40,6 @@ typedef struct SDataSinkMgtCfg {
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef void* DataSinkHandle;
typedef struct SInputData { typedef struct SInputData {
const SSDataBlock* pData; const SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap; SHashObj* pTableRetrieveTsMap;
...@@ -68,6 +67,10 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH ...@@ -68,6 +67,10 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
*/ */
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus);
/**
*
* @param handle
*/
void dsEndPut(DataSinkHandle handle); void dsEndPut(DataSinkHandle handle);
/** /**
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,14 +20,16 @@ ...@@ -20,14 +20,16 @@
#include "ttszip.h" #include "ttszip.h"
#include "tvariant.h" #include "tvariant.h"
#include "thash.h" #include "dataSinkMgt.h"
#include "executil.h" #include "executil.h"
#include "planner.h"
#include "taosdef.h" #include "taosdef.h"
#include "tarray.h" #include "tarray.h"
#include "tfilter.h" #include "tfilter.h"
#include "thash.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tpagedfile.h" #include "tpagedfile.h"
#include "planner.h" #include "executor.h"
struct SColumnFilterElem; struct SColumnFilterElem;
...@@ -256,6 +258,7 @@ typedef struct SExecTaskInfo { ...@@ -256,6 +258,7 @@ typedef struct SExecTaskInfo {
// void* rspContext; // response context // void* rspContext; // response context
char *sql; // query sql string char *sql; // query sql string
jmp_buf env; // jmp_buf env; //
DataSinkHandle dsHandle;
struct SOperatorInfo *pRoot; struct SOperatorInfo *pRoot;
} SExecTaskInfo; } SExecTaskInfo;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tarray.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
#include "dataSinkInt.h" #include "dataSinkInt.h"
#include "planner.h" #include "planner.h"
......
...@@ -13,9 +13,10 @@ ...@@ -13,9 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <dataSinkMgt.h>
#include "exception.h"
#include "os.h" #include "os.h"
#include "tarray.h"
#include "dataSinkMgt.h"
#include "exception.h"
#include "tcache.h" #include "tcache.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
...@@ -69,8 +70,9 @@ void freeParam(STaskParam *param) { ...@@ -69,8 +70,9 @@ void freeParam(STaskParam *param) {
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) { int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) {
assert(tsdb != NULL && pSubplan != NULL); assert(tsdb != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
int32_t code = doCreateExecTaskInfo(pSubplan, (SExecTaskInfo**) pTaskInfo, tsdb); int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -81,8 +83,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ ...@@ -81,8 +83,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
goto _error; goto _error;
} }
DataSinkHandle pHandle = NULL; code = dsCreateDataSinker(pSubplan->pDataSink, (*pTask)->dsHandle);
code = dsCreateDataSinker(pSubplan->pDataSink, &pHandle);
_error: _error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
...@@ -134,64 +135,79 @@ int waitMoment(SQInfo* pQInfo){ ...@@ -134,64 +135,79 @@ int waitMoment(SQInfo* pQInfo){
} }
#endif #endif
bool qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes) { int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *) tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId(); int64_t threadId = taosGetSelfPthreadId();
int64_t curOwner = 0; int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("QInfo:0x%"PRIx64"-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*) curOwner); qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
(void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
return false; return pTaskInfo->code;
} }
if(pTaskInfo->cost.start == 0) { if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampMs(); pTaskInfo->cost.start = taosGetTimestampMs();
} }
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%"PRIx64" it is already killed, abort", GET_TASKID(pTaskInfo)); qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
// return doBuildResCheck(pTaskInfo); return pTaskInfo->code;
} }
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; // STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
// if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) { // if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) {
// qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo)); // qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo));
// setTaskStatus(pTaskInfo, TASK_COMPLETED); // setTaskStatus(pTaskInfo, TASK_COMPLETED);
// return doBuildResCheck(pTaskInfo); // return doBuildResCheck(pTaskInfo);
// } // }
// error occurs, record the error code and return to client // error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env); int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pTaskInfo, ret); publishQueryAbortEvent(pTaskInfo, ret);
pTaskInfo->code = ret; pTaskInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
// return doBuildResCheck(pTaskInfo); return pTaskInfo->code;
} }
qDebug("QInfo:0x%"PRIx64" query task is launched", GET_TASKID(pTaskInfo)); qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
bool newgroup = false; bool newgroup = false;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0;
int64_t st = taosGetTimestampUs(); handle = &pTaskInfo->dsHandle;
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
// todo put the result into sink node.
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); while(1) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); st = taosGetTimestampUs();
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
if (isTaskKilled(pTaskInfo)) { pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
qDebug("QInfo:0x%"PRIx64" query is killed", GET_TASKID(pTaskInfo)); publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, if (pRes == NULL) { // no results generated yet, abort
// pRuntimeEnv->resultInfo.total); return pTaskInfo->code;
} else { }
// qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pTaskInfo->qId,
// GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total); int32_t status = 0;
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &status);
if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo));
// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
// pRuntimeEnv->resultInfo.total);
}
if (status == DS_CAPACITY_FULL) {
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo),
0, 0L, 0);
return pTaskInfo->code;
}
} }
// return doBuildResCheck(pTaskInfo);
} }
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
......
...@@ -7181,6 +7181,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) { ...@@ -7181,6 +7181,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) {
pthread_mutex_init(&pTaskInfo->lock, NULL); pthread_mutex_init(&pTaskInfo->lock, NULL);
pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId;
return pTaskInfo; return pTaskInfo;
} }
......
...@@ -1038,9 +1038,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1038,9 +1038,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsped = true; queryRsped = true;
SSDataBlock* pRes = NULL; DataSinkHandle handle = NULL;
code = qExecTask(pTaskInfo, &pRes); code = qExecTask(pTaskInfo, &handle);
queryDone = false; queryDone = false;
//TODO call executer to execute subquery //TODO call executer to execute subquery
...@@ -1048,7 +1048,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1048,7 +1048,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (code) { if (code) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} else { } else {
QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, pRes)); // QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, pRes));
QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
} }
......
...@@ -274,7 +274,7 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -274,7 +274,7 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef); TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef);
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle); bool isTsdbCacheLastRow(TsdbQueryHandleT* pTsdbReadHandle);
/** /**
...@@ -308,19 +308,19 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle); ...@@ -308,19 +308,19 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
/** /**
* move to next block if exists * move to next block if exists
* *
* @param pQueryHandle * @param pTsdbReadHandle
* @return * @return
*/ */
bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle); bool tsdbNextDataBlock(TsdbQueryHandleT pTsdbReadHandle);
/** /**
* Get current data block information * Get current data block information
* *
* @param pQueryHandle * @param pTsdbReadHandle
* @param pBlockInfo * @param pBlockInfo
* @return * @return
*/ */
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo); void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
/** /**
* *
...@@ -332,7 +332,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *p ...@@ -332,7 +332,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *p
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return * @return
*/ */
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis); int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis);
/** /**
* *
...@@ -340,11 +340,11 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta ...@@ -340,11 +340,11 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta
* the returned data block must be satisfied with the time window condition in any cases, * the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks. * which means the SData data block is not actually the completed disk data blocks.
* *
* @param pQueryHandle query handle * @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list * @param pColumnIdList required data columns id list
* @return * @return
*/ */
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pTsdbReadHandle, SArray *pColumnIdList);
/** /**
* Get the qualified table id for a super table according to the tag query expression. * Get the qualified table id for a super table according to the tag query expression.
......
...@@ -284,7 +284,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -284,7 +284,7 @@ typedef struct SQueryRuntimeEnv {
uint32_t status; // query status uint32_t status; // query status
void* qinfo; void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not uint8_t scanFlag; // denotes reversed scan of data or not
void* pQueryHandle; void* pTsdbReadHandle;
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
bool enableGroupData; bool enableGroupData;
...@@ -418,7 +418,7 @@ typedef struct SQueryParam { ...@@ -418,7 +418,7 @@ typedef struct SQueryParam {
} SQueryParam; } SQueryParam;
typedef struct STableScanInfo { typedef struct STableScanInfo {
void *pQueryHandle; void *pTsdbReadHandle;
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t numOfSkipped; int32_t numOfSkipped;
int32_t numOfBlockStatis; int32_t numOfBlockStatis;
......
...@@ -2326,8 +2326,8 @@ _clean: ...@@ -2326,8 +2326,8 @@ _clean:
static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) { static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pTsdbReadHandle);
pRuntimeEnv->pQueryHandle = NULL; pRuntimeEnv->pTsdbReadHandle = NULL;
SMemRef* pMemRef = &pQueryAttr->memRef; SMemRef* pMemRef = &pQueryAttr->memRef;
assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL); assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL);
...@@ -3148,10 +3148,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -3148,10 +3148,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
} else if ((*status) == BLK_DATA_STATIS_NEEDED) { } else if ((*status) == BLK_DATA_STATIS_NEEDED) {
// this function never returns error? // this function never returns error?
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockStatis);
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
pCost->totalCheckedRows += pBlock->info.rows; pCost->totalCheckedRows += pBlock->info.rows;
} }
} else { } else {
...@@ -3159,7 +3159,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -3159,7 +3159,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// load the data block statistics to perform further filter // load the data block statistics to perform further filter
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockStatis);
if (pQueryAttr->topBotQuery && pBlock->pBlockStatis != NULL) { if (pQueryAttr->topBotQuery && pBlock->pBlockStatis != NULL) {
{ // set previous window { // set previous window
...@@ -3205,7 +3205,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -3205,7 +3205,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pCost->totalCheckedRows += pBlockInfo->rows; pCost->totalCheckedRows += pBlockInfo->rows;
pCost->loadBlocks += 1; pCost->loadBlocks += 1;
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
if (pBlock->pDataBlock == NULL) { if (pBlock->pDataBlock == NULL) {
return terrno; return terrno;
} }
...@@ -4494,7 +4494,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -4494,7 +4494,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// //
// assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1); // assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
// //
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); // SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
// //
// // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value // // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
...@@ -4521,15 +4521,15 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -4521,15 +4521,15 @@ void queryCostStatis(SQInfo *pQInfo) {
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); // int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
// //
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; // TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
// //
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(pQueryHandle)) { // while (tsdbNextDataBlock(pTsdbReadHandle)) {
// if (isQueryKilled(pRuntimeEnv->qinfo)) { // if (isQueryKilled(pRuntimeEnv->qinfo)) {
// longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); // longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
// } // }
// //
// tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); // tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
// //
// if (pQueryAttr->limit.offset > blockInfo.rows) { // if (pQueryAttr->limit.offset > blockInfo.rows) {
// pQueryAttr->limit.offset -= blockInfo.rows; // pQueryAttr->limit.offset -= blockInfo.rows;
...@@ -4562,7 +4562,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -4562,7 +4562,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// //
// // load the data block and check data remaining in current data block // // load the data block and check data remaining in current data block
// // TODO optimize performance // // TODO optimize performance
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); // SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
// //
// tw = *win; // tw = *win;
...@@ -4627,8 +4627,8 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -4627,8 +4627,8 @@ void queryCostStatis(SQInfo *pQInfo) {
// STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; // STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
// //
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { // while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo); // tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
// //
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) { // if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { // if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
...@@ -4674,7 +4674,7 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -4674,7 +4674,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// */ // */
// if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) { // if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
// //
// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); // SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
// //
// if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) { // if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
...@@ -4748,7 +4748,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 ...@@ -4748,7 +4748,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (isFirstLastRowQuery(pQueryAttr)) { if (isFirstLastRowQuery(pQueryAttr)) {
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); pRuntimeEnv->pTsdbReadHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
// update the query time window // update the query time window
pQueryAttr->window = cond.twindow; pQueryAttr->window = cond.twindow;
...@@ -4769,11 +4769,11 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 ...@@ -4769,11 +4769,11 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
} }
} }
} else if (isCachedLastQuery(pQueryAttr)) { } else if (isCachedLastQuery(pQueryAttr)) {
pRuntimeEnv->pQueryHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); pRuntimeEnv->pTsdbReadHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
} else if (pQueryAttr->pointInterpQuery) { } else if (pQueryAttr->pointInterpQuery) {
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); pRuntimeEnv->pTsdbReadHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
} else { } else {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); pRuntimeEnv->pTsdbReadHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
} }
return terrno; return terrno;
...@@ -4831,19 +4831,19 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4831,19 +4831,19 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
switch(tbScanner) { switch(tbScanner) {
case OP_TableBlockInfoScan: { case OP_TableBlockInfoScan: {
pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
break; break;
} }
case OP_TableSeqScan: { case OP_TableSeqScan: {
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
break; break;
} }
case OP_DataBlocksOptScan: { case OP_DataBlocksOptScan: {
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
break; break;
} }
case OP_TableScan: { case OP_TableScan: {
pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
break; break;
} }
default: { // do nothing default: { // do nothing
...@@ -4974,13 +4974,13 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { ...@@ -4974,13 +4974,13 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
*newgroup = false; *newgroup = false;
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { while (tsdbNextDataBlock(pTableScanInfo->pTsdbReadHandle)) {
if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) { if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) {
longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
pTableScanInfo->numOfBlocks += 1; pTableScanInfo->numOfBlocks += 1;
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); tsdbRetrieveDataBlockInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->info);
// todo opt // todo opt
if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
...@@ -5037,7 +5037,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { ...@@ -5037,7 +5037,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
} }
if (++pTableScanInfo->current >= pTableScanInfo->times) { if (++pTableScanInfo->current >= pTableScanInfo->times) {
if (pTableScanInfo->reverseTimes <= 0 || isTsdbCacheLastRow(pTableScanInfo->pQueryHandle)) { if (pTableScanInfo->reverseTimes <= 0 || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)) {
return NULL; return NULL;
} else { } else {
break; break;
...@@ -5046,7 +5046,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { ...@@ -5046,7 +5046,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
// do prepare for the next round table scan operation // do prepare for the next round table scan operation
STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN; pRuntimeEnv->scanFlag = REPEAT_SCAN;
...@@ -5069,7 +5069,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { ...@@ -5069,7 +5069,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
setupEnvForReverseScan(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); setupEnvForReverseScan(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey);
...@@ -5112,8 +5112,8 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { ...@@ -5112,8 +5112,8 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
tableBlockDist.maxRows = INT_MIN; tableBlockDist.maxRows = INT_MIN;
tableBlockDist.minRows = INT_MAX; tableBlockDist.minRows = INT_MAX;
tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, &tableBlockDist); tsdbGetFileBlocksDistInfo(pTableScanInfo->pTsdbReadHandle, &tableBlockDist);
tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pQueryHandle); tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pTsdbReadHandle);
SSDataBlock* pBlock = &pTableScanInfo->block; SSDataBlock* pBlock = &pTableScanInfo->block;
pBlock->info.rows = 1; pBlock->info.rows = 1;
...@@ -5142,7 +5142,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5142,7 +5142,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
...@@ -5165,7 +5165,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5165,7 +5165,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->times = 1; pInfo->times = 1;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
...@@ -5189,7 +5189,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE ...@@ -5189,7 +5189,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
SColumnInfoData infoData = {{0}}; SColumnInfoData infoData = {{0}};
...@@ -5271,7 +5271,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -5271,7 +5271,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pTsdbReadHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
pInfo->current = 0; pInfo->current = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册