提交 335cb651 编写于 作者: H Haojun Liao

other:merge 3.0

上级 9ccf7add
......@@ -671,6 +671,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
while (1) {
uint64_t ts;
bool hasMore = false;
// ASSERT(0);
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
......
......@@ -346,6 +346,8 @@ typedef struct STableMergeScanInfo {
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT*
SArray* queryConds; // array of queryTableDataCond
STsdbReader* pReader;
SReadHandle readHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
......@@ -371,7 +373,6 @@ typedef struct STableMergeScanInfo {
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
STsdbReader* pReader;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
......
......@@ -1681,9 +1681,14 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
taosArrayPush(pTableList->pTableList, &keyInfo);
if (pTableList->oneTableForEachGroup || pTableList->numOfOuputGroups > 1) {
if (!pTableList->oneTableForEachGroup) {
if (pTableList->map == NULL) {
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
}
taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -624,11 +624,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
ASSERT(binfo.uid != 0);
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
ASSERT(pBlock->info.groupId != 0);
// uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
// if (groupId) {
// pBlock->info.groupId = *groupId;
// }
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
......@@ -1570,6 +1565,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
} else {
ASSERT(0);
pInfo->pRes->info.groupId = 0;
}
......@@ -4438,6 +4434,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
int64_t uid;
SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam;
......@@ -4514,8 +4511,10 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
}
tsdbReaderClose(pInfo->pReader);
pInfo->pReader = NULL;
return NULL;
}
static SSDataBlock* getTableDataBlock2(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
......
......@@ -5384,6 +5384,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
minTs = TMIN(minTs, pBlock->info.window.skey);
qDebug("-------------------------groupId:%ld", pBlock->info.groupId);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册