提交 bdbb9848 编写于 作者: H Haojun Liao

[td-13039] add new api.

上级 b771748e
...@@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); ...@@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
* @param pBlocks
* @param numOfInputBlock
* @param type
* @return
*/
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
* *
...@@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
*/ */
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds);
/**
* Retrieve the produced results information, if current query is not paused or completed,
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param tinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext);
/** /**
* kill the ongoing query and free the query handle and corresponding resources automatically * kill the ongoing query and free the query handle and corresponding resources automatically
* @param tinfo qhandle * @param tinfo qhandle
...@@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/ */
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
//================================================================================================
// query handle management
/**
* Query handle mgmt object
* @param vgId
* @return
*/
void* qOpenTaskMgmt(int32_t vgId);
/**
* broadcast the close information and wait for all query stop.
* @param pExecutor
*/
void qTaskMgmtNotifyClosing(void* pExecutor);
/**
* Re-open the query handle management module when opening the vnode again.
* @param pExecutor
*/
void qQueryMgmtReOpen(void* pExecutor);
/**
* Close query mgmt and clean up resources.
* @param pExecutor
*/
void qCleanupTaskMgmt(void* pExecutor);
/**
* Add the query into the query mgmt object
* @param pMgmt
* @param qId
* @param qInfo
* @return
*/
void** qRegisterTask(void* pMgmt, uint64_t qId, void* qInfo);
/**
* acquire the query handle according to the key from query mgmt object.
* @param pMgmt
* @param key
* @return
*/
void** qAcquireTask(void* pMgmt, uint64_t key);
/** /**
* release the query handle and decrease the reference count in cache * release the query handle and decrease the reference count in cache
* @param pMgmt * @param pMgmt
...@@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key); ...@@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key);
*/ */
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
/**
* De-register the query handle from the management module and free it immediately.
* @param pMgmt
* @param pQInfo
* @return
*/
void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
......
...@@ -430,9 +430,10 @@ typedef struct STagScanInfo { ...@@ -430,9 +430,10 @@ typedef struct STagScanInfo {
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
int32_t blockType; // current block type int32_t blockType; // current block type
bool blockValid; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
......
...@@ -14,11 +14,12 @@ ...@@ -14,11 +14,12 @@
*/ */
#include "executor.h" #include "executor.h"
#include "tdatablock.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "vnode.h" #include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void** input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
...@@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
} }
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->readerHandle, input[0], 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} else { } else {
ASSERT(!pInfo->blockValid); for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = input[i];
SSDataBlock* pDataBlock = input; SSDataBlock* p = createOneDataBlock(pDataBlock);
pInfo->pRes->info = pDataBlock->info; p->info = pDataBlock->info;
taosArrayClear(pInfo->pRes->pDataBlock);
taosArrayAddAll(pInfo->pRes->pDataBlock, pDataBlock->pDataBlock);
// set current block valid. taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
pInfo->blockValid = true; taosArrayPush(pInfo->pBlockLists, &p);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
qSetMultiStreamInput(tinfo, (void**) &input, 1, type);
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if (input == NULL) { if (pBlocks == NULL || numOfBlocks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -4723,6 +4723,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -4723,6 +4723,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) {
#endif #endif
} }
static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
size_t total = taosArrayGetSize(pInfo->pBlockLists);
pInfo->validBlockIndex = 0;
for(int32_t i = 0; i < total; ++i) {
SSDataBlock* p = taosArrayGet(pInfo->pBlockLists, i);
blockDataDestroy(p);
}
taosArrayClear(pInfo->pBlockLists);
}
static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) { static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -4735,43 +4746,45 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) ...@@ -4735,43 +4746,45 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup)
} }
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
if (pInfo->blockValid) { size_t total = taosArrayGetSize(pInfo->pBlockLists);
pInfo->blockValid = false; // this block can only be used once. if (pInfo->validBlockIndex >= total) {
return pInfo->pRes; doClearBufferedBlocks(pInfo);
} else {
return NULL; return NULL;
} }
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; int32_t current = pInfo->validBlockIndex++;
blockDataCleanup(pInfo->pRes); return taosArrayGet(pInfo->pBlockLists, current);
} else {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->readerHandle)) { while (tqNextDataBlock(pInfo->readerHandle)) {
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
terrno = pTaskInfo->code; terrno = pTaskInfo->code;
return NULL; return NULL;
} }
if (pBlockInfo->rows == 0) { if (pBlockInfo->rows == 0) {
return NULL; return NULL;
} }
pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle); pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle);
if (pInfo->pRes->pDataBlock == NULL) { if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log // TODO add log
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
break; break;
} }
// record the scan action. // record the scan action.
pInfo->numOfExec++; pInfo->numOfExec++;
pInfo->numOfRows += pBlockInfo->rows; pInfo->numOfRows += pBlockInfo->rows;
return (pBlockInfo->rows == 0)? NULL:pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
}
} }
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
...@@ -5408,6 +5421,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* ...@@ -5408,6 +5421,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
return NULL; return NULL;
} }
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
if (pInfo->pBlockLists == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
pInfo->readerHandle = streamReadHandle; pInfo->readerHandle = streamReadHandle;
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册