提交 9a0e9df5 编写于 作者: S shenglian zhou

enhance: block is ts sorted and each book is a source

上级 f93af4d2
...@@ -53,6 +53,7 @@ typedef struct STableMergeScanSortSourceParam { ...@@ -53,6 +53,7 @@ typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator; SOperatorInfo* pOperator;
int32_t readerIdx; int32_t readerIdx;
uint64_t uid; uint64_t uid;
STsdbReader* reader;
} STableMergeScanSortSourceParam; } STableMergeScanSortSourceParam;
typedef struct STableCountScanOperatorInfo { typedef struct STableCountScanOperatorInfo {
...@@ -2733,28 +2734,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2733,28 +2734,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = pInfo->pReaderBlock; SSDataBlock* pBlock = pInfo->pReaderBlock;
int32_t code = 0; int32_t code = 0;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
bool hasNext = false; bool hasNext = false;
if (NULL == pInfo->base.dataReader) {
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, p, 1, pBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
pInfo->readIdx = readIdx + pInfo->tableStartIndex ;
} else if (pInfo->readIdx != readIdx + pInfo->tableStartIndex) {
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, p, 1);
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->readIdx = readIdx + pInfo->tableStartIndex ;
}
STsdbReader* reader = pInfo->base.dataReader; STsdbReader* reader = pInfo->base.dataReader;
while (true) { while (true) {
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
...@@ -2837,6 +2822,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* ...@@ -2837,6 +2822,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
{ {
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
...@@ -2866,21 +2853,15 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2866,21 +2853,15 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
// one table has one data block // one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1; int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
for (int32_t i = 0; i < numOfTable; ++i) {
STableMergeScanSortSourceParam param = {0}; STableMergeScanSortSourceParam param = {0};
param.readerIdx = i;
param.pOperator = pOperator; param.pOperator = pOperator;
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
taosArrayPush(pInfo->sortSourceParams, &param);
}
for (int32_t i = 0; i < numOfTable; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); ps->param = &param;
ps->param = param;
ps->onlyRef = true; ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps); tsortAddSource(pInfo->pSortHandle, ps);
}
int32_t code = tsortOpen(pInfo->pSortHandle); int32_t code = tsortOpen(pInfo->pSortHandle);
...@@ -2903,7 +2884,10 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2903,7 +2884,10 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
taosArrayClear(pInfo->sortSourceParams); if (pInfo->base.dataReader != NULL) {
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
}
tsortDestroySortHandle(pInfo->pSortHandle); tsortDestroySortHandle(pInfo->pSortHandle);
pInfo->pSortHandle = NULL; pInfo->pSortHandle = NULL;
......
...@@ -937,22 +937,20 @@ static int32_t createInitialSources(SSortHandle* pHandle) { ...@@ -937,22 +937,20 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
// pHandle->numOfPages = 1024; //todo check sortbufsize // pHandle->numOfPages = 1024; //todo check sortbufsize
createPageBuf(pHandle); createPageBuf(pHandle);
for (int i = 0; i < nSrc; ++i) { SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i);
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
while (pBlk != NULL) { while (pBlk != NULL) {
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
addDataBlockToPageBuf(pHandle, pBlk, aPgId); addDataBlockToPageBuf(pHandle, pBlk, aPgId);
pBlk = pHandle->fetchfp(pSrc->param); SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
} code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pBlock, &pHandle->sourceId, aPgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aExtSrc); taosArrayDestroy(aExtSrc);
return code; return code;
} }
pBlk = pHandle->fetchfp(pSrc->param);
} }
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
taosArrayDestroy(aExtSrc); taosArrayDestroy(aExtSrc);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册