未验证 提交 378aaf38 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10259 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
...@@ -582,9 +582,9 @@ typedef struct SOrderOperatorInfo { ...@@ -582,9 +582,9 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
...@@ -622,7 +622,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); ...@@ -622,7 +622,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void* destroyOutputBuf(SSDataBlock* pBlock); void* blockDataDestroy(SSDataBlock* pBlock);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
......
...@@ -336,7 +336,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO ...@@ -336,7 +336,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
return res; return res;
} }
void* destroyOutputBuf(SSDataBlock* pBlock) { void* blockDataDestroy(SSDataBlock* pBlock) {
if (pBlock == NULL) { if (pBlock == NULL) {
return NULL; return NULL;
} }
...@@ -4835,11 +4835,11 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4835,11 +4835,11 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
break; break;
} }
case OP_TableSeqScan: { case OP_TableSeqScan: {
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); pRuntimeEnv->proot = createTableSeqScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
break; break;
} }
case OP_DataBlocksOptScan: { case OP_DataBlocksOptScan: {
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); pRuntimeEnv->proot = createTableScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
break; break;
} }
case OP_TableScan: { case OP_TableScan: {
...@@ -5162,7 +5162,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5162,7 +5162,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
return pOperator; return pOperator;
} }
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->pTsdbReadHandle = pTsdbQueryHandle;
...@@ -5267,7 +5267,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -5267,7 +5267,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} }
} }
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
...@@ -5278,7 +5278,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -5278,7 +5278,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->name = "TableScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan; pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv; pOptr->pRuntimeEnv = pRuntimeEnv;
pOptr->blockingOptr = false; pOptr->blockingOptr = false;
...@@ -5373,7 +5373,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5373,7 +5373,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList); taosArrayDestroy(pInfo->orderColumnList);
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
} }
...@@ -6566,7 +6566,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { ...@@ -6566,7 +6566,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
tfree(pInfo->rowCellInfoOffset); tfree(pInfo->rowCellInfoOffset);
cleanupResultRowInfo(&pInfo->resultRowInfo); cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -6590,7 +6590,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6590,7 +6590,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->p); tfree(pInfo->p);
} }
...@@ -6607,12 +6607,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6607,12 +6607,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param; STagScanInfo* pInfo = (STagScanInfo*) param;
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
} }
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -6625,7 +6625,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6625,7 +6625,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
taosHashCleanup(pInfo->pSet); taosHashCleanup(pInfo->pSet);
tfree(pInfo->buf); tfree(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo); taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
namespace { namespace {
// simple test // simple test
void simpleTest() { void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1);
int32_t pageId = 0; int32_t pageId = 0;
...@@ -22,40 +22,40 @@ void simpleTest() { ...@@ -22,40 +22,40 @@ void simpleTest() {
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL); ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getResBufSize(pResultBuf), 1024); ASSERT_EQ(getTotalBufSize(pResultBuf), 1024);
SIDList list = getDataBufPagesIdList(pResultBuf, groupId); SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1); ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1); ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
releaseResBufPage(pResultBuf, pBufPage); releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t = getResBufPage(pResultBuf, pageId); tFilePage* t = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t == pBufPage1); ASSERT_TRUE(t == pBufPage1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId); tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage2); ASSERT_TRUE(t1 == pBufPage2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId); tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage3); ASSERT_TRUE(t2 == pBufPage3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId); tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage4); ASSERT_TRUE(t3 == pBufPage4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId); tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage5); ASSERT_TRUE(t4 == pBufPage5);
destroyResultBuf(pResultBuf); destroyResultBuf(pResultBuf);
} }
void writeDownTest() { void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0; int32_t pageId = 0;
...@@ -68,31 +68,31 @@ void writeDownTest() { ...@@ -68,31 +68,31 @@ void writeDownTest() {
*(int32_t*)(pBufPage->data) = nx; *(int32_t*)(pBufPage->data) = nx;
writePageId = pageId; writePageId = pageId;
releaseResBufPage(pResultBuf, pBufPage); releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId); tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1); ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId); tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2); ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId); tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3); ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId); tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4); ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4); releaseBufPage(pResultBuf, t4);
// flush the written page to disk, and read it out again // flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId); tFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx); ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
...@@ -102,7 +102,7 @@ void writeDownTest() { ...@@ -102,7 +102,7 @@ void writeDownTest() {
} }
void recyclePageTest() { void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0; int32_t pageId = 0;
...@@ -112,41 +112,41 @@ void recyclePageTest() { ...@@ -112,41 +112,41 @@ void recyclePageTest() {
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL); ASSERT_TRUE(pBufPage != NULL);
releaseResBufPage(pResultBuf, pBufPage); releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId); tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1); ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId); tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2); ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId); tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3); ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId); tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4); ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4); releaseBufPage(pResultBuf, t4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId); tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t5 = getResBufPage(pResultBuf, pageId); tFilePage* t5 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t5 == pBufPage5); ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5); ASSERT_TRUE(pageId == 5);
// flush the written page to disk, and read it out again // flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId); tFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
*(int32_t*)(pBufPagex->data) = nx; *(int32_t*)(pBufPagex->data) = nx;
writePageId = pageId; // update the data writePageId = pageId; // update the data
releaseResBufPage(pResultBuf, pBufPagex); releaseBufPage(pResultBuf, pBufPagex);
tFilePage* pBufPagex1 = getResBufPage(pResultBuf, 1); tFilePage* pBufPagex1 = getBufPage(pResultBuf, 1);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 6); ASSERT_EQ(taosArrayGetSize(pa), 6);
......
...@@ -16,6 +16,11 @@ ...@@ -16,6 +16,11 @@
#ifndef TDENGINE_COMMON_H #ifndef TDENGINE_COMMON_H
#define TDENGINE_COMMON_H #define TDENGINE_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h" #include "taosdef.h"
#include "tarray.h" #include "tarray.h"
#include "tmsg.h" #include "tmsg.h"
...@@ -44,8 +49,8 @@ ...@@ -44,8 +49,8 @@
typedef struct { typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray* pGroupList; SArray *pGroupList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo; } STableGroupInfo;
typedef struct SColumnDataAgg { typedef struct SColumnDataAgg {
...@@ -74,18 +79,28 @@ typedef struct SConstantItem { ...@@ -74,18 +79,28 @@ typedef struct SConstantItem {
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg* pBlockAgg; SColumnDataAgg *pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData> SArray *pDataBlock; // SArray<SColumnInfoData>
SArray* pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value. SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info; SDataBlockInfo info;
} SSDataBlock; } SSDataBlock;
typedef struct SVarColAttr {
int32_t *offset; // start position for each entry in the list
uint32_t length; // used buffer size that contain the valid data
uint32_t allocLen; // allocated buffer size
} SVarColAttr;
// pBlockAgg->numOfNull == info.rows, all data are null // pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null. // pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData { typedef struct SColumnInfoData {
SColumnInfo info; // TODO filter info needs to be removed SColumnInfo info; // TODO filter info needs to be removed
char* nullbitmap; // bool hasNull;// if current column data has null value.
char* pData; // the corresponding block data in memory char *pData; // the corresponding block data in memory
union {
char *nullbitmap; // bitmap, one bit for each item in the list
SVarColAttr varmeta;
};
} SColumnInfoData; } SColumnInfoData;
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
...@@ -235,11 +250,11 @@ typedef struct SSqlExpr { ...@@ -235,11 +250,11 @@ typedef struct SSqlExpr {
char token[TSDB_COL_NAME_LEN]; // original token char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema; SSchema resSchema;
int32_t numOfCols; int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3 SVariant param[3]; // parameters are not more than 3
} SSqlExpr; } SSqlExpr;
typedef struct SExprInfo { typedef struct SExprInfo {
...@@ -261,4 +276,8 @@ typedef struct SSessionWindow { ...@@ -261,4 +276,8 @@ typedef struct SSessionWindow {
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_COMMON_H #endif // TDENGINE_COMMON_H
...@@ -7,12 +7,22 @@ extern "C" { ...@@ -7,12 +7,22 @@ extern "C" {
#include "os.h" #include "os.h"
#include "tmsg.h" #include "tmsg.h"
#include "common.h"
typedef struct SCorEpSet { typedef struct SCorEpSet {
int32_t version; int32_t version;
SEpSet epSet; SEpSet epSet;
} SCorEpSet; } SCorEpSet;
typedef struct SBlockOrderInfo {
int32_t order;
int32_t colIndex;
SColumnInfoData *pColData;
// int32_t type;
// int32_t bytes;
// bool hasNull;
} SBlockOrderInfo;
int taosGetFqdnPortFromEp(const char *ep, SEp *pEp); int taosGetFqdnPortFromEp(const char *ep, SEp *pEp);
void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port); void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port);
...@@ -21,6 +31,77 @@ bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2); ...@@ -21,6 +31,77 @@ bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet); void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
SEpSet getEpSet_s(SCorEpSet *pEpSet); SEpSet getEpSet_s(SCorEpSet *pEpSet);
#define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
#define colDataSetNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) {
if (!pColumnInfoData->hasNull) {
return false;
}
if (pColAgg != NULL) {
if (pColAgg->numOfNull == totalRows) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return true;
} else if (pColAgg->numOfNull == 0) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return false;
}
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.offset[row] == -1;
} else {
if (pColumnInfoData->nullbitmap == NULL) {
return false;
}
return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
}
}
#define colDataGet(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: (p1_)->pData + ((r_) * (p1_)->info.bytes));
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t colDataGetNumOfCols(const SSDataBlock* pBlock);
size_t colDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize);
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
void *blockDataDestroy(SSDataBlock *pBlock);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -473,7 +473,7 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { ...@@ -473,7 +473,7 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
TD_ROW_SET_TYPE(pBuilder->pBuf, pBuilder->rowType); TD_ROW_SET_TYPE(pBuilder->pBuf, pBuilder->rowType);
uint32_t len = 0; uint32_t len = 0;
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#endif #endif
OP_ENUM_MACRO(StreamScan) OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan) OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(SystemTableScan) OP_ENUM_MACRO(SystemTableScan)
......
...@@ -22,28 +22,31 @@ extern "C" { ...@@ -22,28 +22,31 @@ extern "C" {
typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param); typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param);
typedef struct SLoserTreeNode { typedef struct STreeNode {
int32_t index; int32_t index;
void *pData; void *pData; // TODO remove it?
} SLoserTreeNode; } STreeNode;
typedef struct SLoserTreeInfo { typedef struct SMultiwayMergeTreeInfo {
int32_t numOfEntries; int32_t numOfSources;
int32_t totalEntries; int32_t totalSources;
__merge_compare_fn_t comparFn; __merge_compare_fn_t comparFn;
void * param; void * param;
SLoserTreeNode *pNode; struct STreeNode *pNode;
} SLoserTreeInfo; } SMultiwayMergeTreeInfo;
uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); #define tMergeTreeGetChosenIndex(t_) ((t_)->pNode[0].index)
#define tMergeTreeGetAdjustIndex(t_) (tMergeTreeGetChosenIndex(t_) + (t_)->numOfSources)
void tLoserTreeInit(SLoserTreeInfo *pTree); int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
void tLoserTreeAdjust(SLoserTreeInfo *pTree, int32_t idx); void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree);
void tLoserTreeRebuild(SLoserTreeInfo *pTree); void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
void tLoserTreeDisplay(SLoserTreeInfo *pTree); void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
void tMergeTreePrint(const SMultiwayMergeTreeInfo *pTree);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TPAGEDFILE_H #ifndef TDENGINE_TPAGEDBUF_H
#define TDENGINE_TPAGEDFILE_H #define TDENGINE_TPAGEDBUF_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -26,57 +26,10 @@ extern "C" { ...@@ -26,57 +26,10 @@ extern "C" {
#include "tlockfree.h" #include "tlockfree.h"
typedef struct SArray* SIDList; typedef struct SArray* SIDList;
typedef struct SPageInfo SPageInfo;
typedef struct SPageDiskInfo { typedef struct SDiskbasedBuf SDiskbasedBuf;
int32_t offset;
int32_t length;
} SPageDiskInfo;
typedef struct SPageInfo {
SListNode* pn; // point to list node
int32_t pageId;
SPageDiskInfo info;
void* pData;
bool used; // set current page is in used
} SPageInfo;
typedef struct SFreeListItem {
int32_t offset;
int32_t len;
} SFreeListItem;
typedef struct SResultBufStatis {
int32_t flushBytes;
int32_t loadBytes;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SResultBufStatis;
typedef struct SDiskbasedResultBuf {
int32_t numOfPages;
int64_t totalBufSize;
int64_t fileSize; // disk file size
FILE* file;
int32_t allocateId; // allocated page id
char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* groupSet; // id hash table
SHashObj* all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position
uint64_t qId; // for debug purpose
SResultBufStatis statis;
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define DEFAULT_PAGE_SIZE (16384L) #define DEFAULT_PAGE_SIZE (16384L)
typedef struct SFilePage { typedef struct SFilePage {
...@@ -84,76 +37,84 @@ typedef struct SFilePage { ...@@ -84,76 +37,84 @@ typedef struct SFilePage {
char data[]; char data[];
} SFilePage; } SFilePage;
typedef struct SDiskbasedBufStatis {
int64_t flushBytes;
int64_t loadBytes;
int32_t loadPages;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SDiskbasedBufStatis;
/** /**
* create disk-based result buffer * create disk-based result buffer
* @param pResultBuf * @param pBuf
* @param rowSize * @param rowSize
* @param pagesize * @param pagesize
* @param inMemPages * @param inMemPages
* @param handle * @param handle
* @return * @return
*/ */
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
/** /**
* *
* @param pResultBuf * @param pBuf
* @param groupId * @param groupId
* @param pageId * @param pageId
* @return * @return
*/ */
SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId); SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
/** /**
* *
* @param pResultBuf * @param pBuf
* @param groupId * @param groupId
* @return * @return
*/ */
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId);
/** /**
* get the specified buffer page by id * get the specified buffer page by id
* @param pResultBuf * @param pBuf
* @param id * @param id
* @return * @return
*/ */
SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id);
/** /**
* release the referenced buf pages * release the referenced buf pages
* @param pResultBuf * @param pBuf
* @param page * @param page
*/ */
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); void releaseBufPage(SDiskbasedBuf* pBuf, void* page);
/** /**
* *
* @param pResultBuf * @param pBuf
* @param pi * @param pi
*/ */
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi); void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi);
/** /**
* get the total buffer size in the format of disk file * get the total buffer size in the format of disk file
* @param pResultBuf * @param pBuf
* @return * @return
*/ */
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf); size_t getTotalBufSize(const SDiskbasedBuf* pBuf);
/** /**
* get the number of groups in the result buffer * get the number of groups in the result buffer
* @param pResultBuf * @param pBuf
* @return * @return
*/ */
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf); size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf);
/** /**
* destroy result buffer * destroy result buffer
* @param pResultBuf * @param pBuf
*/ */
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf); void destroyResultBuf(SDiskbasedBuf* pBuf);
/** /**
* *
...@@ -162,8 +123,49 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf); ...@@ -162,8 +123,49 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf);
*/ */
SPageInfo* getLastPageInfo(SIDList pList); SPageInfo* getLastPageInfo(SIDList pList);
/**
*
* @param pPgInfo
* @return
*/
int32_t getPageId(const SPageInfo* pPgInfo);
/**
* Return the buffer page size.
* @param pBuf
* @return
*/
int32_t getBufPageSize(const SDiskbasedBuf* pBuf);
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf);
/**
*
* @param pBuf
* @return
*/
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf);
/**
* Set the buffer page is dirty, and needs to be flushed to disk when swap out.
* @param pPageInfo
* @param dirty
*/
void setBufPageDirty(SFilePage* pPageInfo, bool dirty);
/**
* Print the statistics when closing this buffer
* @param pBuf
*/
void printStatisBeforeClose(SDiskbasedBuf* pBuf);
/**
* return buf statistics.
*/
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif // TDENGINE_TPAGEDFILE_H #endif // TDENGINE_TPAGEDBUF_H
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "tnote.h" #include "tnote.h"
#include "tpagedfile.h" #include "tpagedbuf.h"
#include "tref.h" #include "tref.h"
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "tnote.h" #include "tnote.h"
#include "tpagedfile.h" #include "tpagedbuf.h"
#include "tref.h" #include "tref.h"
struct tmq_list_t { struct tmq_list_t {
......
此差异已折叠。
#include <common.h>
#include "os.h" #include "os.h"
#include "tutil.h" #include "tutil.h"
...@@ -268,4 +269,5 @@ SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* nam ...@@ -268,4 +269,5 @@ SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* nam
tstrncpy(s.name, name, tListLen(s.name)); tstrncpy(s.name, name, tListLen(s.name));
return s; return s;
} }
\ No newline at end of file
...@@ -344,8 +344,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -344,8 +344,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
UNUSED(ret); UNUSED(ret);
} }
fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); int32_t ret = fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); ret = fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
// NOTE: mix types tags are not supported // NOTE: mix types tags are not supported
size_t sz = 0; size_t sz = 0;
......
#include <common.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <tep.h>
#include <iostream> #include <iostream>
#pragma GCC diagnostic push #pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#pragma GCC diagnostic ignored "-Wsign-compare" #pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h" #include "os.h"
...@@ -96,4 +97,199 @@ TEST(testCase, toInteger_test) { ...@@ -96,4 +97,199 @@ TEST(testCase, toInteger_test) {
ASSERT_EQ(ret, -1); ASSERT_EQ(ret, -1);
} }
TEST(testCase, Datablock_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
infoData.pData = (char*) calloc(40, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (40/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 40;
infoData1.info.type = TSDB_DATA_TYPE_BINARY;
infoData1.info.colId = 2;
infoData1.varmeta.offset = (int32_t*) calloc(40, sizeof(uint32_t));
taosArrayPush(b->pDataBlock, &infoData1);
char* str = "the value of: %d";
char buf[128] = {0};
char varbuf[128] = {0};
for(int32_t i = 0; i < 40; ++i) {
SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1);
if (i&0x01) {
int32_t len = sprintf(buf, str, i);
STR_TO_VARSTR(varbuf, buf)
colDataAppend(p0, i, (const char*) &i, false);
colDataAppend(p1, i, (const char*) varbuf, false);
memset(varbuf, 0, sizeof(varbuf));
memset(buf, 0, sizeof(buf));
} else {
colDataAppend(p0, i, (const char*) &i, true);
colDataAppend(p1, i, (const char*) varbuf, true);
}
b->info.rows++;
}
SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1);
for(int32_t i = 0; i < 40; ++i) {
if (i & 0x01) {
ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), false);
ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), false);
} else {
ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), true);
ASSERT_EQ(colDataIsNull(p0, b->info.rows, i, nullptr), true);
ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), true);
}
}
printf("binary column length:%d\n", *(int32_t*) p1->pData);
ASSERT_EQ(colDataGetNumOfCols(b), 2);
ASSERT_EQ(colDataGetNumOfRows(b), 40);
char* pData = colDataGet(p1, 3);
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo));
SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0};
taosArrayPush(pOrderInfo, &order);
blockDataSort(b, pOrderInfo, true);
blockDataDestroy(b);
taosArrayDestroy(pOrderInfo);
}
#if 0
TEST(testCase, non_var_dataBlock_split_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
int32_t numOfRows = 1000000;
infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 1;
infoData1.info.type = TSDB_DATA_TYPE_TINYINT;
infoData1.info.colId = 2;
infoData1.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData1.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData1);
for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1);
int8_t v = i;
colDataAppend(p0, i, (const char*)&i, false);
colDataAppend(p1, i, (const char*)&v, false);
b->info.rows++;
}
int32_t pageSize = 64 * 1024;
int32_t startIndex= 0;
int32_t stopIndex = 0;
int32_t count = 1;
while(1) {
blockDataSplitRows(b, false, startIndex, &stopIndex, pageSize);
printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex);
if (stopIndex == numOfRows - 1) {
break;
}
startIndex = stopIndex + 1;
}
}
#endif
TEST(testCase, var_dataBlock_split_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
int32_t numOfRows = 1000000;
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 40;
infoData1.info.type = TSDB_DATA_TYPE_BINARY;
infoData1.info.colId = 2;
infoData1.varmeta.offset = (int32_t*) calloc(numOfRows, sizeof(uint32_t));
taosArrayPush(b->pDataBlock, &infoData1);
char buf[41] = {0};
char buf1[100] = {0};
for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1);
int8_t v = i;
colDataAppend(p0, i, (const char*)&i, false);
sprintf(buf, "the number of row:%d", i);
int32_t len = sprintf(buf1, buf, i);
STR_TO_VARSTR(buf1, buf)
colDataAppend(p1, i, buf1, false);
b->info.rows++;
memset(buf, 0, sizeof(buf));
memset(buf1, 0, sizeof(buf1));
}
int32_t pageSize = 64 * 1024;
int32_t startIndex= 0;
int32_t stopIndex = 0;
int32_t count = 1;
while(1) {
blockDataSplitRows(b, true, startIndex, &stopIndex, pageSize);
printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex);
if (stopIndex == numOfRows - 1) {
break;
}
startIndex = stopIndex + 1;
}
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
\ No newline at end of file
...@@ -2174,8 +2174,8 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu ...@@ -2174,8 +2174,8 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualTables; sup.numOfTables = numOfQualTables;
SLoserTreeInfo* pTree = NULL; SMultiwayMergeTreeInfo* pTree = NULL;
uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
cleanBlockOrderSupporter(&sup, numOfTables); cleanBlockOrderSupporter(&sup, numOfTables);
return TSDB_CODE_TDB_OUT_OF_MEMORY; return TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -2184,7 +2184,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu ...@@ -2184,7 +2184,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
int32_t numOfTotal = 0; int32_t numOfTotal = 0;
while (numOfTotal < cnt) { while (numOfTotal < cnt) {
int32_t pos = pTree->pNode[0].index; int32_t pos = tMergeTreeGetChosenIndex(pTree);
int32_t index = sup.blockIndexArray[pos]++; int32_t index = sup.blockIndexArray[pos]++;
STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos]; STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
...@@ -2195,7 +2195,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu ...@@ -2195,7 +2195,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1; sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
} }
tLoserTreeAdjust(pTree, pos + sup.numOfTables); tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
} }
/* /*
...@@ -3643,13 +3643,13 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch ...@@ -3643,13 +3643,13 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) { SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid); STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
if (pTbCfg == NULL) { if (pTbCfg == NULL) {
// tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId); tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
goto _error; goto _error;
} }
if (pTbCfg->type != META_SUPER_TABLE) { if (pTbCfg->type != META_SUPER_TABLE) {
// tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId); tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
goto _error; goto _error;
} }
...@@ -3668,8 +3668,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch ...@@ -3668,8 +3668,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res); pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
// tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta,
// pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId); pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
taosArrayDestroy(res); taosArrayDestroy(res);
return ret; return ret;
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#define TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H
#include "common.h" #include "common.h"
#include "tpagedfile.h"
#include "tbuffer.h" #include "tbuffer.h"
#include "tpagedbuf.h"
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \ do { \
...@@ -126,6 +126,13 @@ static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFile ...@@ -126,6 +126,13 @@ static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFile
// return ((char *)page->data) + rowOffset + offset * numOfRows; // return ((char *)page->data) + rowOffset + offset * numOfRows;
} }
static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffset, int32_t offset) {
assert(rowOffset >= 0);
int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
}
//bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); //bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
//bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); //bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
......
...@@ -141,9 +141,9 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_ ...@@ -141,9 +141,9 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_
return; return;
} }
// the result does not put into the SDiskbasedResultBuf, ignore it. // the result does not put into the SDiskbasedBuf, ignore it.
if (pResultRow->pageId >= 0) { if (pResultRow->pageId >= 0) {
SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0; int16_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) { for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
...@@ -358,7 +358,6 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) ...@@ -358,7 +358,6 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo)
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES); pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES);
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
} }
...@@ -533,7 +532,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -533,7 +532,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t *posList = NULL; int32_t *posList = NULL;
SLoserTreeInfo *pTree = NULL; SMultiwayMergeTreeInfo *pTree = NULL;
STableQueryInfo **pTableQueryInfoList = NULL; STableQueryInfo **pTableQueryInfoList = NULL;
size_t size = taosArrayGetSize(pTableList); size_t size = taosArrayGetSize(pTableList);
...@@ -566,7 +565,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -566,7 +565,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order}; SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order};
int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY; code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end; goto _end;
...@@ -576,7 +575,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -576,7 +575,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int64_t startt = taosGetTimestampMs(); int64_t startt = taosGetTimestampMs();
while (1) { while (1) {
int32_t tableIndex = pTree->pNode[0].index; int32_t tableIndex = tMergeTreeGetChosenIndex(pTree);
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo; SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]); SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
...@@ -612,7 +611,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -612,7 +611,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
} }
} }
tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries); tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
} }
int64_t endt = taosGetTimestampMs(); int64_t endt = taosGetTimestampMs();
......
...@@ -33,193 +33,312 @@ ...@@ -33,193 +33,312 @@
#include "stub.h" #include "stub.h"
#include "executor.h" #include "executor.h"
/** namespace {
{
"Id": { typedef struct SDummyInputInfo {
"QueryId": 1.3108161807422521e+19, int32_t max;
"TemplateId": 0, int32_t current;
"SubplanId": 0 int32_t startVal;
}, SSDataBlock* pBlock;
"Node": { } SDummyInputInfo;
"Name": "TableScan",
"Targets": [{ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
"Base": { SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(param);
"Schema": { SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
"Type": 9, if (pInfo->current >= pInfo->max) {
"ColId": 5000, return NULL;
"Bytes": 8 }
},
"Columns": [{ int32_t numOfRows = 1000;
"TableId": 1,
"Flag": 0, if (pInfo->pBlock == NULL) {
"Info": { pInfo->pBlock = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
"ColId": 1,
"Type": 9, pInfo->pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
"Bytes": 8
} SColumnInfoData colInfo = {0};
}], colInfo.info.type = TSDB_DATA_TYPE_INT;
"InterBytes": 0 colInfo.info.bytes = sizeof(int32_t);
}, colInfo.info.colId = 1;
"Expr": { colInfo.pData = static_cast<char*>(calloc(numOfRows, sizeof(int32_t)));
"Type": 4, colInfo.nullbitmap = static_cast<char*>(calloc(1, (numOfRows + 7) / 8));
"Column": {
"Type": 9, taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo);
"ColId": 1,
"Bytes": 8 // SColumnInfoData colInfo1 = {0};
} // colInfo1.info.type = TSDB_DATA_TYPE_BINARY;
} // colInfo1.info.bytes = 40;
}, { // colInfo1.info.colId = 2;
"Base": { //
"Schema": { // colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t);
"Type": 4, // colInfo1.varmeta.length = 0;
"ColId": 5001, // colInfo1.varmeta.offset = static_cast<int32_t*>(calloc(1, numOfRows * sizeof(int32_t)));
"Bytes": 4 //
}, // taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
"Columns": [{ } else {
"TableId": 1, blockDataClearup(pInfo->pBlock, true);
"Flag": 0, }
"Info": {
"ColId": 2, SSDataBlock* pBlock = pInfo->pBlock;
"Type": 4,
"Bytes": 4 char buf[128] = {0};
} char b1[128] = {0};
}], for(int32_t i = 0; i < numOfRows; ++i) {
"InterBytes": 0 SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
},
"Expr": { int32_t v = (--pInfo->startVal);
"Type": 4, colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
"Column": {
"Type": 4, // sprintf(buf, "this is %d row", i);
"ColId": 2, // STR_TO_VARSTR(b1, buf);
"Bytes": 4 //
} // SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
} // colDataAppend(pColInfo2, i, b1, false);
}], }
"InputSchema": [{
"Type": 9, pBlock->info.rows = numOfRows;
"ColId": 5000, pBlock->info.numOfCols = 1;
"Bytes": 8
}, { pInfo->current += 1;
"Type": 4, return pBlock;
"ColId": 5001,
"Bytes": 4
}],
"TableScan": {
"TableId": 1,
"TableType": 2,
"Flag": 0,
"Window": {
"StartKey": -9.2233720368547758e+18,
"EndKey": 9.2233720368547758e+18
}
}
},
"DataSink": {
"Name": "Dispatch",
"Dispatch": {
}
}
} }
*/
SOperatorInfo* createDummyOperator(int32_t numOfBlocks) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(calloc(1, sizeof(SOperatorInfo)));
pOperator->name = "dummyInputOpertor4Test";
pOperator->exec = getDummyBlock;
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
pInfo->max = numOfBlocks;
pInfo->startVal = 1500000;
pOperator->info = pInfo;
return pOperator;
}
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
TEST(testCase, build_executor_tree_Test) { TEST(testCase, build_executor_tree_Test) {
const char* msg = "{\n" const char* msg = "{\n"
"\t\"Id\":\t{\n" "\t\"Id\":\t{\n"
"\t\t\"QueryId\":\t1.3108161807422521e+19,\n" "\t\t\"QueryId\":\t1.3108161807422521e+19,\n"
"\t\t\"TemplateId\":\t0,\n" "\t\t\"TemplateId\":\t0,\n"
"\t\t\"SubplanId\":\t0\n" "\t\t\"SubplanId\":\t0\n"
"\t},\n" "\t},\n"
"\t\"Node\":\t{\n" "\t\"Node\":\t{\n"
"\t\t\"Name\":\t\"TableScan\",\n" "\t\t\"Name\":\t\"TableScan\",\n"
"\t\t\"Targets\":\t[{\n" "\t\t\"Targets\":\t[{\n"
"\t\t\t\t\"Base\":\t{\n" "\t\t\t\t\"Base\":\t{\n"
"\t\t\t\t\t\"Schema\":\t{\n" "\t\t\t\t\t\"Schema\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t9,\n" "\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\"ColId\":\t5000,\n" "\t\t\t\t\t\t\"ColId\":\t5000,\n"
"\t\t\t\t\t\t\"Bytes\":\t8\n" "\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t},\n" "\t\t\t\t\t},\n"
"\t\t\t\t\t\"Columns\":\t[{\n" "\t\t\t\t\t\"Columns\":\t[{\n"
"\t\t\t\t\t\t\t\"TableId\":\t1,\n" "\t\t\t\t\t\t\t\"TableId\":\t1,\n"
"\t\t\t\t\t\t\t\"Flag\":\t0,\n" "\t\t\t\t\t\t\t\"Flag\":\t0,\n"
"\t\t\t\t\t\t\t\"Info\":\t{\n" "\t\t\t\t\t\t\t\"Info\":\t{\n"
"\t\t\t\t\t\t\t\t\"ColId\":\t1,\n" "\t\t\t\t\t\t\t\t\"ColId\":\t1,\n"
"\t\t\t\t\t\t\t\t\"Type\":\t9,\n" "\t\t\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\t\t\"Bytes\":\t8\n" "\t\t\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t\t\t}\n" "\t\t\t\t\t\t\t}\n"
"\t\t\t\t\t\t}],\n" "\t\t\t\t\t\t}],\n"
"\t\t\t\t\t\"InterBytes\":\t0\n" "\t\t\t\t\t\"InterBytes\":\t0\n"
"\t\t\t\t},\n" "\t\t\t\t},\n"
"\t\t\t\t\"Expr\":\t{\n" "\t\t\t\t\"Expr\":\t{\n"
"\t\t\t\t\t\"Type\":\t4,\n" "\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\"Column\":\t{\n" "\t\t\t\t\t\"Column\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t9,\n" "\t\t\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\t\t\"ColId\":\t1,\n" "\t\t\t\t\t\t\"ColId\":\t1,\n"
"\t\t\t\t\t\t\"Bytes\":\t8\n" "\t\t\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t\t\t}\n" "\t\t\t\t\t}\n"
"\t\t\t\t}\n" "\t\t\t\t}\n"
"\t\t\t}, {\n" "\t\t\t}, {\n"
"\t\t\t\t\"Base\":\t{\n" "\t\t\t\t\"Base\":\t{\n"
"\t\t\t\t\t\"Schema\":\t{\n" "\t\t\t\t\t\"Schema\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t4,\n" "\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\"ColId\":\t5001,\n" "\t\t\t\t\t\t\"ColId\":\t5001,\n"
"\t\t\t\t\t\t\"Bytes\":\t4\n" "\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t},\n" "\t\t\t\t\t},\n"
"\t\t\t\t\t\"Columns\":\t[{\n" "\t\t\t\t\t\"Columns\":\t[{\n"
"\t\t\t\t\t\t\t\"TableId\":\t1,\n" "\t\t\t\t\t\t\t\"TableId\":\t1,\n"
"\t\t\t\t\t\t\t\"Flag\":\t0,\n" "\t\t\t\t\t\t\t\"Flag\":\t0,\n"
"\t\t\t\t\t\t\t\"Info\":\t{\n" "\t\t\t\t\t\t\t\"Info\":\t{\n"
"\t\t\t\t\t\t\t\t\"ColId\":\t2,\n" "\t\t\t\t\t\t\t\t\"ColId\":\t2,\n"
"\t\t\t\t\t\t\t\t\"Type\":\t4,\n" "\t\t\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\t\t\"Bytes\":\t4\n" "\t\t\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t\t\t}\n" "\t\t\t\t\t\t\t}\n"
"\t\t\t\t\t\t}],\n" "\t\t\t\t\t\t}],\n"
"\t\t\t\t\t\"InterBytes\":\t0\n" "\t\t\t\t\t\"InterBytes\":\t0\n"
"\t\t\t\t},\n" "\t\t\t\t},\n"
"\t\t\t\t\"Expr\":\t{\n" "\t\t\t\t\"Expr\":\t{\n"
"\t\t\t\t\t\"Type\":\t4,\n" "\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\"Column\":\t{\n" "\t\t\t\t\t\"Column\":\t{\n"
"\t\t\t\t\t\t\"Type\":\t4,\n" "\t\t\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\t\t\"ColId\":\t2,\n" "\t\t\t\t\t\t\"ColId\":\t2,\n"
"\t\t\t\t\t\t\"Bytes\":\t4\n" "\t\t\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t\t\t}\n" "\t\t\t\t\t}\n"
"\t\t\t\t}\n" "\t\t\t\t}\n"
"\t\t\t}],\n" "\t\t\t}],\n"
"\t\t\"InputSchema\":\t[{\n" "\t\t\"InputSchema\":\t[{\n"
"\t\t\t\t\"Type\":\t9,\n" "\t\t\t\t\"Type\":\t9,\n"
"\t\t\t\t\"ColId\":\t5000,\n" "\t\t\t\t\"ColId\":\t5000,\n"
"\t\t\t\t\"Bytes\":\t8\n" "\t\t\t\t\"Bytes\":\t8\n"
"\t\t\t}, {\n" "\t\t\t}, {\n"
"\t\t\t\t\"Type\":\t4,\n" "\t\t\t\t\"Type\":\t4,\n"
"\t\t\t\t\"ColId\":\t5001,\n" "\t\t\t\t\"ColId\":\t5001,\n"
"\t\t\t\t\"Bytes\":\t4\n" "\t\t\t\t\"Bytes\":\t4\n"
"\t\t\t}],\n" "\t\t\t}],\n"
"\t\t\"TableScan\":\t{\n" "\t\t\"TableScan\":\t{\n"
"\t\t\t\"TableId\":\t1,\n" "\t\t\t\"TableId\":\t1,\n"
"\t\t\t\"TableType\":\t2,\n" "\t\t\t\"TableType\":\t2,\n"
"\t\t\t\"Flag\":\t0,\n" "\t\t\t\"Flag\":\t0,\n"
"\t\t\t\"Window\":\t{\n" "\t\t\t\"Window\":\t{\n"
"\t\t\t\t\"StartKey\":\t-9.2233720368547758e+18,\n" "\t\t\t\t\"StartKey\":\t-9.2233720368547758e+18,\n"
"\t\t\t\t\"EndKey\":\t9.2233720368547758e+18\n" "\t\t\t\t\"EndKey\":\t9.2233720368547758e+18\n"
"\t\t\t}\n" "\t\t\t}\n"
"\t\t}\n" "\t\t}\n"
"\t},\n" "\t},\n"
"\t\"DataSink\":\t{\n" "\t\"DataSink\":\t{\n"
"\t\t\"Name\":\t\"Dispatch\",\n" "\t\t\"Name\":\t\"Dispatch\",\n"
"\t\t\"Dispatch\":\t{\n" "\t\t\"Dispatch\":\t{\n"
"\t\t}\n" "\t\t}\n"
"\t}\n" "\t}\n"
"}"; "}";
SExecTaskInfo* pTaskInfo = nullptr; SExecTaskInfo* pTaskInfo = nullptr;
DataSinkHandle sinkHandle = nullptr; DataSinkHandle sinkHandle = nullptr;
int32_t code = qCreateExecTask((SReadHandle*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); SReadHandle handle = {.reader = reinterpret_cast<void*>(0x1), .meta = reinterpret_cast<void*>(0x1)};
// int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
} }
//TEST(testCase, inMem_sort_Test) {
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o);
//
// SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo));
// SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
// exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res");
// taosArrayPush(pExprInfo, &exp);
//
// SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
// exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1);
//
// SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal);
//
// bool newgroup = false;
// SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup);
//
// SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
// for(int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGet(pCol2, i);
// printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
// }
//}
typedef struct su {
int32_t v;
char *c;
} su;
int32_t cmp(const void* p1, const void* p2) {
su* v1 = (su*) p1;
su* v2 = (su*) p2;
int32_t x1 = *(int32_t*) v1->c;
int32_t x2 = *(int32_t*) v2->c;
if (x1 == x2) {
return 0;
} else {
return x1 < x2? -1:1;
}
}
TEST(testCase, external_sort_Test) {
#if 0
su* v = static_cast<su*>(calloc(1000000, sizeof(su)));
for(int32_t i = 0; i < 1000000; ++i) {
v[i].v = rand();
v[i].c = static_cast<char*>(malloc(4));
*(int32_t*) v[i].c = i;
}
qsort(v, 1000000, sizeof(su), cmp);
// for(int32_t i = 0; i < 1000; ++i) {
// printf("%d ", v[i]);
// }
// printf("\n");
return;
#endif
srand(time(NULL));
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
o.order = TSDB_ORDER_ASC;
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo));
SExprInfo *exp = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res");
taosArrayPush(pExprInfo, &exp);
SExprInfo *exp1 = static_cast<SExprInfo*>(calloc(1, sizeof(SExprInfo)));
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(1500), pExprInfo, pOrderVal);
bool newgroup = false;
SSDataBlock* pRes = NULL;
int32_t total = 1;
int64_t s1 = taosGetTimestampUs();
int32_t t = 1;
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->exec(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {
printf("---------------elapsed:%ld\n", e - s);
}
if (pRes == NULL) {
break;
}
SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
for (int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGet(pCol2, i);
printf("%d: %d\n", total++, ((int32_t*)pCol1->pData)[i]);
// printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
}
}
printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf);
int64_t s2 = taosGetTimestampUs();
printf("total:%ld\n", s2 - s1);
pOperator->cleanupFn(pOperator->info, 2);
tfree(exp);
tfree(exp1);
taosArrayDestroy(pExprInfo);
taosArrayDestroy(pOrderVal);
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
...@@ -49,7 +49,7 @@ typedef struct SHistogramInfo { ...@@ -49,7 +49,7 @@ typedef struct SHistogramInfo {
SHistBin* elems; SHistBin* elems;
#else #else
tSkipList* pList; tSkipList* pList;
SLoserTreeInfo* pLoserTree; SMultiwayMergeTreeInfo* pLoserTree;
int32_t maxIndex; int32_t maxIndex;
bool ordered; bool ordered;
#endif #endif
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "tpagedfile.h" #include "tpagedbuf.h"
#include "ttszip.h" #include "ttszip.h"
typedef struct MinMaxEntry { typedef struct MinMaxEntry {
...@@ -63,7 +63,7 @@ typedef struct tMemBucket { ...@@ -63,7 +63,7 @@ typedef struct tMemBucket {
__compar_fn_t comparFn; __compar_fn_t comparFn;
tMemBucketSlot * pSlots; tMemBucketSlot * pSlots;
SDiskbasedResultBuf *pBuffer; SDiskbasedBuf *pBuffer;
__perc_hash_func_t hashFunc; __perc_hash_func_t hashFunc;
} tMemBucket; } tMemBucket;
......
...@@ -117,14 +117,14 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -117,14 +117,14 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
if ((*pHisto)->ordered) { if ((*pHisto)->ordered) {
int32_t lastIndex = (*pHisto)->maxIndex; int32_t lastIndex = (*pHisto)->maxIndex;
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree;
(*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].pData = pResNode; (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].pData = pResNode;
pEntry1->index = (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].index; pEntry1->index = (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].index;
// update the loser tree // update the loser tree
if ((*pHisto)->ordered) { if ((*pHisto)->ordered) {
tLoserTreeAdjust(pTree, pEntry1->index + pTree->numOfEntries); tMergeTreeAdjust(pTree, pEntry1->index + pTree->numOfEntries);
} }
tSkipListKey kx = tSkipListKey kx =
...@@ -142,10 +142,10 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -142,10 +142,10 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
SHistBin* pPrevEntry = (SHistBin*)pResNode->pBackward[0]->pData; SHistBin* pPrevEntry = (SHistBin*)pResNode->pBackward[0]->pData;
pPrevEntry->delta = val - pPrevEntry->val; pPrevEntry->delta = val - pPrevEntry->val;
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree;
if ((*pHisto)->ordered) { if ((*pHisto)->ordered) {
tLoserTreeAdjust(pTree, pPrevEntry->index + pTree->numOfEntries); tMergeTreeAdjust(pTree, pPrevEntry->index + pTree->numOfEntries);
tLoserTreeDisplay(pTree); tMergeTreePrint(pTree);
} }
} }
...@@ -155,7 +155,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -155,7 +155,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
if (!(*pHisto)->ordered) { if (!(*pHisto)->ordered) {
SSkipListPrint((*pHisto)->pList, 1); SSkipListPrint((*pHisto)->pList, 1);
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree;
tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0]; tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0];
tSkipListNode* p1 = pHead; tSkipListNode* p1 = pHead;
...@@ -183,13 +183,13 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -183,13 +183,13 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
pTree->pNode[i].index = -1; pTree->pNode[i].index = -1;
} }
tLoserTreeDisplay(pTree); tMergeTreePrint(pTree);
for (int32_t i = pTree->totalEntries - 1; i >= pTree->numOfEntries; i--) { for (int32_t i = pTree->totalEntries - 1; i >= pTree->numOfEntries; i--) {
tLoserTreeAdjust(pTree, i); tMergeTreeAdjust(pTree, i);
} }
tLoserTreeDisplay(pTree); tMergeTreePrint(pTree);
(*pHisto)->ordered = true; (*pHisto)->ordered = true;
} }
...@@ -219,7 +219,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -219,7 +219,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
pPrevEntry->delta = pEntry->val - pPrevEntry->val; pPrevEntry->delta = pEntry->val - pPrevEntry->val;
} }
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree;
if (pNextEntry->index != -1) { if (pNextEntry->index != -1) {
(*pHisto)->maxIndex = pNextEntry->index; (*pHisto)->maxIndex = pNextEntry->index;
...@@ -230,12 +230,12 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -230,12 +230,12 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
printf("disappear index is:%d\n", f); printf("disappear index is:%d\n", f);
} }
tLoserTreeAdjust(pTree, pEntry->index + pTree->numOfEntries); tMergeTreeAdjust(pTree, pEntry->index + pTree->numOfEntries);
// remove the next node in skiplist // remove the next node in skiplist
tSkipListRemoveNode((*pHisto)->pList, pNext); tSkipListRemoveNode((*pHisto)->pList, pNext);
SSkipListPrint((*pHisto)->pList, 1); SSkipListPrint((*pHisto)->pList, 1);
tLoserTreeDisplay((*pHisto)->pLoserTree); tMergeTreePrint((*pHisto)->pLoserTree);
} else { // add to heap } else { // add to heap
if (pResNode->pForward[0] != NULL) { if (pResNode->pForward[0] != NULL) {
pEntry1->delta = ((SHistBin*)pResNode->pForward[0]->pData)->val - val; pEntry1->delta = ((SHistBin*)pResNode->pForward[0]->pData)->val - val;
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
#include <tglobal.h> #include <tglobal.h>
#include "os.h" #include "os.h"
#include "tpercentile.h"
#include "tpagedfile.h"
#include "taosdef.h" #include "taosdef.h"
#include "tcompare.h" #include "tcompare.h"
#include "tpagedbuf.h"
#include "tpercentile.h"
#include "ttypes.h" #include "ttypes.h"
#define DEFAULT_NUM_OF_SLOT 1024 #define DEFAULT_NUM_OF_SLOT 1024
...@@ -35,9 +35,9 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) ...@@ -35,9 +35,9 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
int32_t offset = 0; int32_t offset = 0;
for(int32_t i = 0; i < list->size; ++i) { for(int32_t i = 0; i < list->size; ++i) {
SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i); struct SPageInfo* pgInfo = *(struct SPageInfo**) taosArrayGet(list, i);
SFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); SFilePage* pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes); offset += (int32_t)(pg->num * pMemBucket->bytes);
...@@ -98,8 +98,8 @@ double findOnlyResult(tMemBucket *pMemBucket) { ...@@ -98,8 +98,8 @@ double findOnlyResult(tMemBucket *pMemBucket) {
SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
assert(list->size == 1); assert(list->size == 1);
SPageInfo* pgInfo = (SPageInfo*) taosArrayGetP(list, 0); struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0);
SFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); SFilePage* pPage = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
assert(pPage->num == 1); assert(pPage->num == 1);
double v = 0; double v = 0;
...@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, ...@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket); resetSlotInfo(pBucket);
int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir); int32_t ret = createDiskbasedBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir);
if (ret != 0) { if (ret != 0) {
tMemBucketDestroy(pBucket); tMemBucketDestroy(pBucket);
return NULL; return NULL;
...@@ -343,7 +343,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { ...@@ -343,7 +343,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0); assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0);
// keep the pointer in memory // keep the pointer in memory
releaseResBufPage(pBucket->pBuffer, pSlot->info.data); releaseBufPage(pBucket->pBuffer, pSlot->info.data);
pSlot->info.data = NULL; pSlot->info.data = NULL;
} }
...@@ -471,10 +471,10 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) ...@@ -471,10 +471,10 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
for (int32_t f = 0; f < list->size; ++f) { for (int32_t f = 0; f < list->size; ++f) {
SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f);
SFilePage *pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); SFilePage *pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo); releaseBufPageInfo(pMemBucket->pBuffer, pgInfo);
} }
return getPercentileImpl(pMemBucket, count - num, fraction); return getPercentileImpl(pMemBucket, count - num, fraction);
......
...@@ -216,7 +216,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable ...@@ -216,7 +216,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
} else if (needSeqScan(pPlanNode)) { } else if (needSeqScan(pPlanNode)) {
return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan); return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
} }
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan; int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTable, type); return createUserTableScanNode(pPlanNode, pTable, type);
} }
...@@ -288,7 +288,7 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { ...@@ -288,7 +288,7 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) { static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) {
SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList; SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList;
vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode); vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode);
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan; int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTableInfo, type); return createUserTableScanNode(pPlanNode, pTableInfo, type);
} }
......
...@@ -88,7 +88,7 @@ static const char* jkPnodeType = "Type"; ...@@ -88,7 +88,7 @@ static const char* jkPnodeType = "Type";
static int32_t getPnodeTypeSize(cJSON* json) { static int32_t getPnodeTypeSize(cJSON* json) {
switch (getNumber(json, jkPnodeType)) { switch (getNumber(json, jkPnodeType)) {
case OP_StreamScan: case OP_StreamScan:
case OP_DataBlocksOptScan: case OP_TableScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return sizeof(STableScanPhyNode); return sizeof(STableScanPhyNode);
case OP_TagScan: case OP_TagScan:
...@@ -830,7 +830,7 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) { ...@@ -830,7 +830,7 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
const SPhyNode* phyNode = (const SPhyNode*)obj; const SPhyNode* phyNode = (const SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_StreamScan: case OP_StreamScan:
case OP_DataBlocksOptScan: case OP_TableScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return tableScanNodeToJson(obj, json); return tableScanNodeToJson(obj, json);
case OP_TagScan: case OP_TagScan:
...@@ -868,7 +868,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { ...@@ -868,7 +868,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
SPhyNode* phyNode = (SPhyNode*)obj; SPhyNode* phyNode = (SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_StreamScan: case OP_StreamScan:
case OP_DataBlocksOptScan: case OP_TableScan:
case OP_TableSeqScan: case OP_TableSeqScan:
return tableScanNodeFromJson(json, obj); return tableScanNodeFromJson(json, obj);
case OP_TagScan: case OP_TagScan:
......
...@@ -14,82 +14,85 @@ ...@@ -14,82 +14,85 @@
*/ */
#include "os.h" #include "os.h"
#include "tlosertree.h"
#include "ulog.h" #include "ulog.h"
#include "tlosertree.h"
#include "taoserror.h"
// set initial value for loser tree // Set the initial value of the multiway merge tree.
void tLoserTreeInit(SLoserTreeInfo* pTree) { static void tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) {
assert((pTree->totalEntries & 0x01) == 0 && (pTree->numOfEntries << 1 == pTree->totalEntries)); assert((pTree->totalSources & 0x01) == 0 && (pTree->numOfSources << 1 == pTree->totalSources));
for (int32_t i = 0; i < pTree->totalEntries; ++i) { for (int32_t i = 0; i < pTree->totalSources; ++i) {
if (i < pTree->numOfEntries) { if (i < pTree->numOfSources) {
pTree->pNode[i].index = -1; pTree->pNode[i].index = -1;
} else { } else {
pTree->pNode[i].index = i - pTree->numOfEntries; pTree->pNode[i].index = i - pTree->numOfSources;
} }
} }
} }
/* int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, void* param, __merge_compare_fn_t compareFn) {
* display whole loser tree on screen for debug purpose only. int32_t totalEntries = numOfSources << 1u;
*/
void tLoserTreeDisplay(SLoserTreeInfo* pTree) {
printf("the value of loser tree:\t");
for (int32_t i = 0; i < pTree->totalEntries; ++i) printf("%d\t", pTree->pNode[i].index);
printf("\n");
}
uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) { SMultiwayMergeTreeInfo* pTreeInfo = (SMultiwayMergeTreeInfo*)calloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries);
int32_t totalEntries = numOfEntries << 1; if (pTreeInfo == NULL) {
*pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries);
if ((*pTree) == NULL) {
uError("allocate memory for loser-tree failed. reason:%s", strerror(errno)); uError("allocate memory for loser-tree failed. reason:%s", strerror(errno));
return -1; return TAOS_SYSTEM_ERROR(errno);
} }
(*pTree)->pNode = (SLoserTreeNode*)(((char*)(*pTree)) + sizeof(SLoserTreeInfo)); pTreeInfo->pNode = (STreeNode*)(((char*)pTreeInfo) + sizeof(SMultiwayMergeTreeInfo));
(*pTree)->numOfEntries = numOfEntries; pTreeInfo->numOfSources = numOfSources;
(*pTree)->totalEntries = totalEntries; pTreeInfo->totalSources = totalEntries;
(*pTree)->param = param; pTreeInfo->param = param;
(*pTree)->comparFn = compareFn; pTreeInfo->comparFn = compareFn;
// set initial value for loser tree // set initial value for loser tree
tLoserTreeInit(*pTree); tMergeTreeInit(pTreeInfo);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("the initial value of loser tree:\n"); printf("the initial value of loser tree:\n");
tLoserTreeDisplay(*pTree); tLoserTreeDisplaypTreeInfo;
#endif #endif
for (int32_t i = totalEntries - 1; i >= numOfEntries; i--) { for (int32_t i = totalEntries - 1; i >= numOfSources; i--) {
tLoserTreeAdjust(*pTree, i); tMergeTreeAdjust(pTreeInfo, i);
} }
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
printf("after adjust:\n"); printf("after adjust:\n");
tLoserTreeDisplay(*pTree); tLoserTreeDisplaypTreeInfo;
printf("initialize local reducer completed!\n"); printf("initialize local reducer completed!\n");
#endif #endif
*pTree = pTreeInfo;
return 0; return 0;
} }
void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree) {
assert(idx <= pTree->totalEntries - 1 && idx >= pTree->numOfEntries && pTree->totalEntries >= 2); if (pTree == NULL) {
return;
}
if (pTree->totalEntries == 2) { tfree(pTree);
}
void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
assert(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2);
if (pTree->totalSources == 2) {
pTree->pNode[0].index = 0; pTree->pNode[0].index = 0;
pTree->pNode[1].index = 0; pTree->pNode[1].index = 0;
return; return;
} }
int32_t parentId = idx >> 1; int32_t parentId = idx >> 1;
SLoserTreeNode kLeaf = pTree->pNode[idx]; STreeNode kLeaf = pTree->pNode[idx];
while (parentId > 0) { while (parentId > 0) {
SLoserTreeNode* pCur = &pTree->pNode[parentId]; STreeNode* pCur = &pTree->pNode[parentId];
if (pCur->index == -1) { if (pCur->index == -1) {
pTree->pNode[parentId] = kLeaf; pTree->pNode[parentId] = kLeaf;
return; return;
...@@ -97,7 +100,7 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { ...@@ -97,7 +100,7 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param); int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param);
if (ret < 0) { if (ret < 0) {
SLoserTreeNode t = pTree->pNode[parentId]; STreeNode t = pTree->pNode[parentId];
pTree->pNode[parentId] = kLeaf; pTree->pNode[parentId] = kLeaf;
kLeaf = t; kLeaf = t;
} }
...@@ -111,11 +114,23 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { ...@@ -111,11 +114,23 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
} }
} }
void tLoserTreeRebuild(SLoserTreeInfo* pTree) { void tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) {
assert((pTree->totalEntries & 0x1) == 0); assert((pTree->totalSources & 0x1) == 0);
tMergeTreeInit(pTree);
for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) {
tMergeTreeAdjust(pTree, i);
}
}
tLoserTreeInit(pTree); /*
for (int32_t i = pTree->totalEntries - 1; i >= pTree->numOfEntries; i--) { * display whole loser tree on screen for debug purpose only.
tLoserTreeAdjust(pTree, i); */
void tMergeTreePrint(const SMultiwayMergeTreeInfo* pTree) {
printf("the value of loser tree:\t");
for (int32_t i = 0; i < pTree->totalSources; ++i) {
printf("%d\t", pTree->pNode[i].index);
} }
printf("\n");
} }
#include <gtest/gtest.h>
#include <cassert>
#include <iostream>
#include "taos.h"
#include "tpagedbuf.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
namespace {
// simple test
void simpleTest() {
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4096, 1, "/tmp/");
int32_t pageId = 0;
int32_t groupId = 0;
SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getTotalBufSize(pResultBuf), 1024);
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
releaseBufPage(pResultBuf, pBufPage);
SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t == pBufPage1);
SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage2);
SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage3);
SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage4);
SFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage5);
destroyResultBuf(pResultBuf);
}
void writeDownTest() {
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4*1024, 1, "/tmp/");
int32_t pageId = 0;
int32_t writePageId = 0;
int32_t groupId = 0;
int32_t nx = 12345;
SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
*(int32_t*)(pBufPage->data) = nx;
writePageId = pageId;
releaseBufPage(pResultBuf, pBufPage);
SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseBufPage(pResultBuf, t4);
// flush the written page to disk, and read it out again
SFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 5);
destroyResultBuf(pResultBuf);
}
void recyclePageTest() {
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4*1024, 1, "/tmp/");
int32_t pageId = 0;
int32_t writePageId = 0;
int32_t groupId = 0;
int32_t nx = 12345;
SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
releaseBufPage(pResultBuf, pBufPage);
SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseBufPage(pResultBuf, t4);
SFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
SFilePage* t5 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5);
// flush the written page to disk, and read it out again
SFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
*(int32_t*)(pBufPagex->data) = nx;
writePageId = pageId; // update the data
releaseBufPage(pResultBuf, pBufPagex);
SFilePage* pBufPagex1 = getBufPage(pResultBuf, 1);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 6);
destroyResultBuf(pResultBuf);
}
} // namespace
TEST(testCase, resultBufferTest) {
srand(time(NULL));
simpleTest();
writeDownTest();
recyclePageTest();
}
#pragma GCC diagnostic pop
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册