提交 1d681a9e 编写于 作者: H Haojun Liao

enh(query): Add the group id in the serialized block format to support the...

enh(query): Add the group id in the serialized block format to support the multi-table(super table) interval query.
上级 4b367634
...@@ -238,10 +238,16 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32 ...@@ -238,10 +238,16 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) { int8_t needCompress) {
int32_t* colSizes = (int32_t*)data; int32_t* actualLen = (int32_t*) data;
data += sizeof(int32_t);
uint64_t* groupId = (uint64_t*) data;
data += sizeof(uint64_t);
int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t); data += numOfCols * sizeof(int32_t);
*dataLen = (numOfCols * sizeof(int32_t));
*dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) { for (int32_t col = 0; col < numOfCols; ++col) {
...@@ -273,6 +279,9 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da ...@@ -273,6 +279,9 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da
colSizes[col] = htonl(colSizes[col]); colSizes[col] = htonl(colSizes[col]);
} }
*actualLen = *dataLen;
*groupId = pBlock->info.groupId;
} }
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -835,10 +835,21 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 ...@@ -835,10 +835,21 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
return code; return code;
} }
int32_t* colLength = (int32_t*)pResultInfo->pData; char* p = (char*) pResultInfo->pData;
char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
int32_t dataLen = *(int32_t*) p;
p += sizeof(int32_t);
uint64_t groupId = *(uint64_t*) p;
p += sizeof(uint64_t);
int32_t* colLength = (int32_t*)p;
p += sizeof(int32_t) * numOfCols;
char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
colLength[i] = htonl(colLength[i]); colLength[i] = htonl(colLength[i]);
ASSERT(colLength[i] < dataLen);
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->pCol[i].offset = (int32_t*)pStart; pResultInfo->pCol[i].offset = (int32_t*)pStart;
......
...@@ -644,12 +644,12 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { ...@@ -644,12 +644,12 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
/** /**
* @refitem blockDataToBuf for the meta size * @refitem blockDataToBuf for the meta size
*
* @param pBlock * @param pBlock
* @return * @return
*/ */
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t); // | total rows/total length | block group id | each column length |
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
} }
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
...@@ -1219,7 +1219,6 @@ void colDataDestroy(SColumnInfoData* pColData) { ...@@ -1219,7 +1219,6 @@ void colDataDestroy(SColumnInfoData* pColData) {
taosMemoryFree(pColData->pData); taosMemoryFree(pColData->pData);
} }
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
int32_t len = BitmapLen(total); int32_t len = BitmapLen(total);
......
...@@ -3223,8 +3223,13 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa ...@@ -3223,8 +3223,13 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows, tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
cur->win.skey, cur->win.ekey, pHandle->idStr); cur->win.skey, cur->win.ekey, pHandle->idStr);
// pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign pDataBlockInfo->uid = uid;
// the table uid
#if 0
// for multi-group data query processing test purpose
pDataBlockInfo->groupId = uid;
#endif
pDataBlockInfo->rows = cur->rows; pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win; pDataBlockInfo->window = cur->win;
pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
......
...@@ -304,8 +304,8 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t ...@@ -304,8 +304,8 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t
memcpy(row.buf, tbuf, len); memcpy(row.buf, tbuf, len);
row.level = level; row.level = level;
row.len = len; row.len = len;
ctx->dataSize += len; ctx->dataSize += row.len;
if (NULL == taosArrayPush(ctx->rows, &row)) { if (NULL == taosArrayPush(ctx->rows, &row)) {
qError("taosArrayPush row to explain res rows failed"); qError("taosArrayPush row to explain res rows failed");
...@@ -783,7 +783,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { ...@@ -783,7 +783,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
} }
int32_t colNum = 1; int32_t colNum = 1;
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize; int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) { if (NULL == rsp) {
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
...@@ -793,29 +793,38 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { ...@@ -793,29 +793,38 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = htonl(rowNum); rsp->numOfRows = htonl(rowNum);
*(int32_t *)rsp->data = htonl(pCtx->dataSize); // payload length
*(int32_t *)rsp->data = sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t)); // group id
*(uint64_t*)(rsp->data + sizeof(int32_t)) = 0;
// column length
int32_t* colLength = (int32_t *)(rsp->data + sizeof(int32_t) + sizeof(uint64_t));
// varchar column offset segment
int32_t *offset = (int32_t *)((char *)colLength + sizeof(int32_t));
// varchar data real payload
char *data = (char *)(offset + rowNum); char *data = (char *)(offset + rowNum);
int32_t tOffset = 0;
char* start = data;
for (int32_t i = 0; i < rowNum; ++i) { for (int32_t i = 0; i < rowNum; ++i) {
SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i); SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i);
*offset = tOffset; offset[i] = data - start;
tOffset += row->len;
memcpy(data, row->buf, row->len); varDataCopy(data, row->buf);
ASSERT(varDataTLen(row->buf) == row->len);
++offset;
data += row->len; data += row->len;
} }
*pRsp = rsp; *colLength = htonl(data - start);
rsp->compLen = htonl(rspSize);
*pRsp = rsp;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) { int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) {
int32_t code = 0; int32_t code = 0;
SNodeListNode *plans = NULL; SNodeListNode *plans = NULL;
...@@ -922,9 +931,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) { ...@@ -922,9 +931,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) { int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0)); QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
QRY_ERR_RET(qExplainAppendPlanRows(pCtx)); QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp)); QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -994,13 +1001,10 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) { ...@@ -994,13 +1001,10 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
SExplainCtx *pCtx = NULL; SExplainCtx *pCtx = NULL;
QRY_ERR_RET(qExplainPrepareCtx(pDag, &pCtx)); QRY_ERR_RET(qExplainPrepareCtx(pDag, &pCtx));
QRY_ERR_JRET(qExplainGenerateRsp(pCtx, pRsp)); QRY_ERR_JRET(qExplainGenerateRsp(pCtx, pRsp));
_return: _return:
qExplainFreeCtx(pCtx); qExplainFreeCtx(pCtx);
QRY_RET(code); QRY_RET(code);
} }
......
...@@ -40,8 +40,6 @@ ...@@ -40,8 +40,6 @@
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
typedef struct SGroupResInfo { typedef struct SGroupResInfo {
int32_t totalGroup; int32_t totalGroup;
int32_t currentGroup; int32_t currentGroup;
...@@ -68,11 +66,16 @@ typedef struct SResultRowPosition { ...@@ -68,11 +66,16 @@ typedef struct SResultRowPosition {
int32_t offset; int32_t offset;
} SResultRowPosition; } SResultRowPosition;
typedef struct SResKeyPos {
SResultRowPosition pos;
uint64_t groupId;
char key[];
} SResKeyPos;
typedef struct SResultRowInfo { typedef struct SResultRowInfo {
SResultRowPosition *pPosition; SResultRowPosition *pPosition;
int32_t size; // number of result set int32_t size; // number of result set
int32_t capacity; // max capacity int32_t capacity; // max capacity
// int32_t curPos; // current active result row index of pResult list
SResultRowPosition cur; SResultRowPosition cur;
} SResultRowInfo; } SResultRowInfo;
...@@ -135,7 +138,7 @@ typedef struct { ...@@ -135,7 +138,7 @@ typedef struct {
int32_t colId; int32_t colId;
} SStddevInterResult; } SStddevInterResult;
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, bool sortGroupResult);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
......
...@@ -64,10 +64,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { ...@@ -64,10 +64,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
} }
// data format: // data format:
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ // +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... // |SDataCacheEntry | total length | group id | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
// | | sizeof(int32_t) * numOfCols | actual size | | actual size | | // | | (4 bytes) |(8 bytes) | sizeof(int32_t) * numOfCols | actual size | | actual size | |
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+ // +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is // The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header // recorded in the first segment, next to the struct header
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
......
...@@ -186,12 +186,50 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { ...@@ -186,12 +186,50 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
} }
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) { static int32_t resultrowCompar1(const void* p1, const void* p2) {
SResKeyPos* pp1 = *(SResKeyPos**) p1;
SResKeyPos* pp2 = *(SResKeyPos**) p2;
if (pp1->groupId == pp2->groupId) {
int64_t pts1 = *(int64_t*) pp1->key;
int64_t pts2 = *(int64_t*) pp2->key;
if (pts1 == pts2) {
return 0;
} else {
return pts1 < pts2? -1:1;
}
} else {
return pp1->groupId < pp2->groupId? -1:1;
}
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, bool sortGroupResult) {
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
} }
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pPosition, pResultInfo->size, sizeof(SResultRowPosition)); // extract the result rows information from the hash map
void* pData = NULL;
pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES);
size_t keyLen = 0;
while((pData = taosHashIterate(pHashmap, pData)) != NULL) {
void* key = taosHashGetKey(pData, &keyLen);
SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition));
p->groupId = *(uint64_t*) key;
p->pos = *(SResultRowPosition*) pData;
memcpy(p->key, key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
taosArrayPush(pGroupResInfo->pRows, &p);
}
if (sortGroupResult) {
qsort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, resultrowCompar1);
}
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
} }
......
...@@ -431,6 +431,13 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, ...@@ -431,6 +431,13 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
return pResultRow; return pResultRow;
} }
/**
* the struct of key in hash table
* +----------+---------------+
* | group id | key data |
* | 8 bytes | actual length |
* +----------+---------------+
*/
static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid, static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId, char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) { SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) {
...@@ -1447,7 +1454,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1447,7 +1454,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
SArray* pUpdated = NULL; SArray* pUpdated = NULL;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pUpdated = taosArrayInit(4, sizeof(SResultRowPosition)); pUpdated = taosArrayInit(4, POINTER_BYTES);
} }
int32_t step = 1; int32_t step = 1;
...@@ -1459,8 +1466,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1459,8 +1466,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
if (pSDataBlock->pDataBlock != NULL) { if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
// assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] ==
// pSDataBlock->info.window.ekey);
} }
int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1);
...@@ -1478,7 +1483,11 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1478,7 +1483,11 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
} }
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*) pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos); taosArrayPush(pUpdated, &pos);
} }
...@@ -1550,7 +1559,11 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1550,7 +1559,11 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
} }
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*) pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos); taosArrayPush(pUpdated, &pos);
} }
...@@ -2812,6 +2825,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { ...@@ -2812,6 +2825,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
} }
} }
// todo merged with the build group result.
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
for (int32_t i = 0; i < pResultRowInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
...@@ -2846,40 +2860,36 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD ...@@ -2846,40 +2860,36 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
} }
} }
// todo merged with the build group result.
void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList, void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList,
int32_t* rowCellInfoOffset) { int32_t* rowCellInfoOffset) {
size_t num = taosArrayGetSize(pUpdateList); size_t num = taosArrayGetSize(pUpdateList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SResultRowPosition* pPos = taosArrayGet(pUpdateList, i); SResKeyPos * pPos = taosArrayGetP(pUpdateList, i);
SFilePage* bufPage = getBufPage(pBuf, pPos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset);
SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset);
//
for (int32_t j = 0; j < numOfOutput; ++j) { for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
//
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { // if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
continue; // continue;
} // }
//
if (pCtx[j].fpSet.process) { // TODO set the dummy function. // if (pCtx[j].fpSet.process) { // TODO set the dummy function.
// pCtx[j].fpSet.finalize(&pCtx[j]); //// pCtx[j].fpSet.finalize(&pCtx[j]);
pResInfo->initialized = true; // pResInfo->initialized = true;
} // }
//
if (pRow->numOfRows < pResInfo->numOfRes) { if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes; pRow->numOfRows = pResInfo->numOfRes;
} }
} }
releaseBufPage(pBuf, bufPage); releaseBufPage(pBuf, bufPage);
/*
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
* the top and bottom query
*/
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
} }
} }
...@@ -3092,7 +3102,6 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin ...@@ -3092,7 +3102,6 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin
} }
/** /**
* copyToOutputBuf support copy data in ascending/descending order
* For interval query of both super table and table, copy the data in ascending order, since the output results are * For interval query of both super table and table, copy the data in ascending order, since the output results are
* ordered in SWindowResutl already. While handling the group by query for both table and super table, * ordered in SWindowResutl already. While handling the group by query for both table and super table,
* all group result are completed already. * all group result are completed already.
...@@ -3101,7 +3110,7 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin ...@@ -3101,7 +3110,7 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin
* @param result * @param result
*/ */
int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
...@@ -3120,10 +3129,10 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased ...@@ -3120,10 +3129,10 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
} }
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
SResultRowPosition* pPos = taosArrayGet(pGroupResInfo->pRows, i); SResKeyPos *pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pageId); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)page + pPos->offset); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
if (pRow->numOfRows == 0) { if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
continue; continue;
...@@ -3767,9 +3776,15 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -3767,9 +3776,15 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
blockDataEnsureCapacity(pRes, numOfRows); blockDataEnsureCapacity(pRes, numOfRows);
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
int32_t dataLen = *(int32_t*) pData;
pData += sizeof(int32_t);
pRes->info.groupId = *(uint64_t*) pData;
pData += sizeof(uint64_t);
int32_t* colLen = (int32_t*)pData; int32_t* colLen = (int32_t*)pData;
char* pStart = pData + sizeof(int32_t) * numOfOutput;
char* pStart = pData + sizeof(int32_t) * numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
colLen[i] = htonl(colLen[i]); colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] > 0); ASSERT(colLen[i] > 0);
...@@ -3822,7 +3837,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -3822,7 +3837,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
blockDataEnsureCapacity(&block, numOfRows); blockDataEnsureCapacity(&block, numOfRows);
int32_t* colLen = (int32_t*) pStart; int32_t dataLen = *(int32_t*) pStart;
uint64_t groupId = *(uint64_t*) (pStart + sizeof(int32_t));
pStart += sizeof(int32_t) + sizeof(uint64_t);
int32_t* colLen = (int32_t*) (pStart);
pStart += sizeof(int32_t) * numOfCols; pStart += sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
...@@ -4277,18 +4296,6 @@ static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { ...@@ -4277,18 +4296,6 @@ static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
// } // }
} }
static SExprInfo* exprArrayDup(SArray* pExprList) {
size_t numOfOutput = taosArrayGetSize(pExprList);
SExprInfo* p = taosMemoryCalloc(numOfOutput, sizeof(SExprInfo));
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprList, i);
assignExprInfo(&p[i], pExpr);
}
return p;
}
// TODO merge aggregate super table // TODO merge aggregate super table
static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
...@@ -4779,7 +4786,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -4779,7 +4786,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf, finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf,
&pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pAggInfo->groupResInfo, &pAggInfo->binfo.resultRowInfo); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5077,7 +5084,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5077,7 +5084,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId);
#if 0 // test for encode/decode result info #if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){ if(pOperator->encodeResultRow){
...@@ -5099,7 +5106,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5099,7 +5106,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5224,7 +5231,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -5224,7 +5231,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
...@@ -5275,7 +5282,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup ...@@ -5275,7 +5282,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); // initGroupedResultInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
...@@ -5400,7 +5407,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5400,7 +5407,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx); doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
...@@ -5449,7 +5456,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) ...@@ -5449,7 +5456,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx); doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
...@@ -5881,7 +5888,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5881,7 +5888,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
// pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlotId; pInfo->primaryTsIndex = primaryTsSlotId;
...@@ -6492,7 +6500,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6492,7 +6500,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}; };
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pTableScanNode->dataRequired, return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pTableScanNode->dataRequired,
pScanPhyNode->count, pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); pScanPhyNode->count, pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
...@@ -6895,8 +6904,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -6895,8 +6904,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
STableGroupInfo group = {0}; (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo);
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = terrno; code = terrno;
goto _complete; goto _complete;
......
...@@ -308,7 +308,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -308,7 +308,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
// } // }
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
while(1) { while(1) {
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
......
...@@ -271,9 +271,7 @@ static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCt ...@@ -271,9 +271,7 @@ static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCt
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo;
*newgroup = false; *newgroup = false;
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
...@@ -284,18 +282,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -284,18 +282,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
pTableScanInfo->numOfBlocks += 1; pTableScanInfo->numOfBlocks += 1;
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
// todo opt
// if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
// STableQueryInfo** pTableQueryInfo =
// (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid));
// if (pTableQueryInfo == NULL) {
// break;
// }
//
// doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order);
// }
// this function never returns error?
uint32_t status = 0; uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
...@@ -308,6 +294,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -308,6 +294,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
continue; continue;
} }
// reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream operator.
pBlock->info.blockId = 0;
return pBlock; return pBlock;
} }
...@@ -405,11 +393,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int ...@@ -405,11 +393,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->getNextFn = doTableScan; pOperator->getNextFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
static int32_t cost = 0; static int32_t cost = 0;
pOperator->cost.openCost = ++cost; pOperator->cost.openCost = ++cost;
...@@ -823,12 +811,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -823,12 +811,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
int32_t tableNameSlotId = 1; int32_t tableNameSlotId = 1;
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId); SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId);
char* name = NULL; char* tb = NULL;
int32_t numOfRows = 0; int32_t numOfRows = 0;
char n[TSDB_TABLE_NAME_LEN] = {0}; char n[TSDB_TABLE_NAME_LEN] = {0};
while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) { while ((tb = metaTbCursorNext(pInfo->pCur)) != NULL) {
STR_TO_VARSTR(n, name); STR_TO_VARSTR(n, tb);
colDataAppend(pTableNameCol, numOfRows, n, false); colDataAppend(pTableNameCol, numOfRows, n, false);
numOfRows += 1; numOfRows += 1;
if (numOfRows >= pInfo->capacity) { if (numOfRows >= pInfo->capacity) {
......
...@@ -3600,7 +3600,9 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -3600,7 +3600,9 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot); if (nodeType(pQuery->pRoot) == QUERY_NODE_SELECT_STMT) {
pQuery->precision = extractResultTsPrecision((SSelectStmt*)pQuery->pRoot);
}
} }
if (NULL != pCxt->pDbs) { if (NULL != pCxt->pDbs) {
......
...@@ -476,6 +476,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t ...@@ -476,6 +476,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
return (void*)buf; return (void*)buf;
} }
// todo remove it
// order array<type *> // order array<type *>
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) { void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册