未验证 提交 c35a3fa3 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10388 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
...@@ -589,7 +589,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRunt ...@@ -589,7 +589,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRunt
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
......
...@@ -2166,7 +2166,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2166,7 +2166,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
case OP_TimeWindow: { case OP_TimeWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput && opType != OP_Join) { if (opType != OP_DummyInput && opType != OP_Join) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
...@@ -6756,7 +6756,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -6756,7 +6756,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator; return pOperator;
} }
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
......
...@@ -18,9 +18,6 @@ typedef struct SBlockOrderInfo { ...@@ -18,9 +18,6 @@ typedef struct SBlockOrderInfo {
int32_t order; int32_t order;
int32_t colIndex; int32_t colIndex;
SColumnInfoData *pColData; SColumnInfoData *pColData;
// int32_t type;
// int32_t bytes;
// bool hasNull;
} SBlockOrderInfo; } SBlockOrderInfo;
int taosGetFqdnPortFromEp(const char *ep, SEp *pEp); int taosGetFqdnPortFromEp(const char *ep, SEp *pEp);
...@@ -67,16 +64,16 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u ...@@ -67,16 +64,16 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
} }
} }
#define colDataGet(p1_, r_) \ #define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes))) : (p1_)->pData + ((r_) * (p1_)->info.bytes))
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void colDataTrim(SColumnInfoData* pColumnInfoData); void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t colDataGetNumOfCols(const SSDataBlock* pBlock); size_t colDataGetNumOfCols(const SSDataBlock* pBlock);
size_t colDataGetNumOfRows(const SSDataBlock* pBlock); size_t colDataGetNumOfRows(const SSDataBlock* pBlock);
...@@ -93,13 +90,15 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock); ...@@ -93,13 +90,15 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize); SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol); void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void *blockDataDestroy(SSDataBlock *pBlock); void *blockDataDestroy(SSDataBlock *pBlock);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -31,6 +31,7 @@ typedef struct SReadHandle { ...@@ -31,6 +31,7 @@ typedef struct SReadHandle {
void* reader; void* reader;
void* meta; void* meta;
} SReadHandle; } SReadHandle;
/** /**
* Create the exec task for streaming mode * Create the exec task for streaming mode
* @param pMsg * @param pMsg
...@@ -40,12 +41,22 @@ typedef struct SReadHandle { ...@@ -40,12 +41,22 @@ typedef struct SReadHandle {
qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
/** /**
* * Set the input data block for the stream scan.
* @param tinfo * @param tinfo
* @param input * @param input
* @return * @return
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input);
/**
* Update the table id list, add or remove.
*
* @param tinfo
* @param id
* @param isAdd
* @return
*/
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd);
/** /**
* Create the exec task object according to task json * Create the exec task object according to task json
......
...@@ -138,8 +138,10 @@ extern SFunctionFpSet fpSet[1]; ...@@ -138,8 +138,10 @@ extern SFunctionFpSet fpSet[1];
// sql function runtime context // sql function runtime context
typedef struct SqlFunctionCtx { typedef struct SqlFunctionCtx {
int32_t startRow;
int32_t size; // number of rows int32_t size; // number of rows
void * pInput; // input data buffer SColumnInfoData* pInput;
uint32_t order; // asc|desc uint32_t order; // asc|desc
int16_t inputType; int16_t inputType;
int16_t inputBytes; int16_t inputBytes;
......
...@@ -47,5 +47,6 @@ OP_ENUM_MACRO(AllTimeWindow) ...@@ -47,5 +47,6 @@ OP_ENUM_MACRO(AllTimeWindow)
OP_ENUM_MACRO(AllMultiTableTimeInterval) OP_ENUM_MACRO(AllMultiTableTimeInterval)
OP_ENUM_MACRO(Order) OP_ENUM_MACRO(Order)
OP_ENUM_MACRO(Exchange) OP_ENUM_MACRO(Exchange)
OP_ENUM_MACRO(SortedMerge)
//OP_ENUM_MACRO(TableScan) //OP_ENUM_MACRO(TableScan)
...@@ -43,8 +43,8 @@ extern "C" { ...@@ -43,8 +43,8 @@ extern "C" {
typedef struct SArray { typedef struct SArray {
size_t size; size_t size;
size_t capacity; uint32_t capacity;
size_t elemSize; uint32_t elemSize;
void* pData; void* pData;
} SArray; } SArray;
......
...@@ -30,10 +30,9 @@ typedef struct SPageInfo SPageInfo; ...@@ -30,10 +30,9 @@ typedef struct SPageInfo SPageInfo;
typedef struct SDiskbasedBuf SDiskbasedBuf; typedef struct SDiskbasedBuf SDiskbasedBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define DEFAULT_PAGE_SIZE (16384L)
typedef struct SFilePage { typedef struct SFilePage {
int64_t num; int32_t num;
char data[]; char data[];
} SFilePage; } SFilePage;
...@@ -55,7 +54,7 @@ typedef struct SDiskbasedBufStatis { ...@@ -55,7 +54,7 @@ typedef struct SDiskbasedBufStatis {
* @param handle * @param handle
* @return * @return
*/ */
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
/** /**
* *
...@@ -64,7 +63,7 @@ int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t in ...@@ -64,7 +63,7 @@ int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t in
* @param pageId * @param pageId
* @return * @return
*/ */
SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
/** /**
* *
...@@ -80,7 +79,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId); ...@@ -80,7 +79,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId);
* @param id * @param id
* @return * @return
*/ */
SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id); void* getBufPage(SDiskbasedBuf* pBuf, int32_t id);
/** /**
* release the referenced buf pages * release the referenced buf pages
...@@ -108,13 +107,13 @@ size_t getTotalBufSize(const SDiskbasedBuf* pBuf); ...@@ -108,13 +107,13 @@ size_t getTotalBufSize(const SDiskbasedBuf* pBuf);
* @param pBuf * @param pBuf
* @return * @return
*/ */
size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf); size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf);
/** /**
* destroy result buffer * destroy result buffer
* @param pBuf * @param pBuf
*/ */
void destroyResultBuf(SDiskbasedBuf* pBuf); void destroyDiskbasedBuf(SDiskbasedBuf* pBuf);
/** /**
* *
...@@ -137,6 +136,11 @@ int32_t getPageId(const SPageInfo* pPgInfo); ...@@ -137,6 +136,11 @@ int32_t getPageId(const SPageInfo* pPgInfo);
*/ */
int32_t getBufPageSize(const SDiskbasedBuf* pBuf); int32_t getBufPageSize(const SDiskbasedBuf* pBuf);
/**
*
* @param pBuf
* @return
*/
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf); int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf);
/** /**
...@@ -148,22 +152,43 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf); ...@@ -148,22 +152,43 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf);
/** /**
* Set the buffer page is dirty, and needs to be flushed to disk when swap out. * Set the buffer page is dirty, and needs to be flushed to disk when swap out.
* @param pPageInfo * @param pPage
* @param dirty * @param dirty
*/ */
void setBufPageDirty(SFilePage* pPageInfo, bool dirty); void setBufPageDirty(void* pPage, bool dirty);
/**
* Set the compress/ no-compress flag for paged buffer, when flushing data in disk.
* @param pBuf
*/
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp);
/**
* Set the pageId page buffer is not need
* @param pBuf
* @param pageId
*/
void dBufSetBufPageRecycled(SDiskbasedBuf *pBuf, void* pPage);
/** /**
* Print the statistics when closing this buffer * Print the statistics when closing this buffer
* @param pBuf * @param pBuf
*/ */
void printStatisBeforeClose(SDiskbasedBuf* pBuf); void dBufSetPrintInfo(SDiskbasedBuf* pBuf);
/** /**
* return buf statistics. * Return buf statistics.
* @param pBuf
* @return
*/ */
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf); SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
/**
* Print the buffer statistics information
* @param pBuf
*/
void dBufPrintStatis(const SDiskbasedBuf* pBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -53,8 +53,10 @@ TEST(testCase, driverInit_Test) { ...@@ -53,8 +53,10 @@ TEST(testCase, driverInit_Test) {
// taos_init(); // taos_init();
} }
#if 0 #if 1
TEST(testCase, connect_Test) { TEST(testCase, connect_Test) {
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
......
...@@ -63,7 +63,7 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) { ...@@ -63,7 +63,7 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) {
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT) #define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
ASSERT(pColumnInfoData != NULL); ASSERT(pColumnInfoData != NULL);
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.length; return pColumnInfoData->varmeta.length;
...@@ -127,6 +127,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con ...@@ -127,6 +127,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;} case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;}
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;} case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;} case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;}
default: default:
...@@ -249,8 +250,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { ...@@ -249,8 +250,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
} }
ASSERT(pColInfoData->nullbitmap == NULL); ASSERT(pColInfoData->nullbitmap == NULL);
pDataBlock->info.window.skey = *(TSKEY*) colDataGet(pColInfoData, 0); pDataBlock->info.window.skey = *(TSKEY*) colDataGetData(pColInfoData, 0);
pDataBlock->info.window.ekey = *(TSKEY*) colDataGet(pColInfoData, (pDataBlock->info.rows - 1)); pDataBlock->info.window.ekey = *(TSKEY*) colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
return 0; return 0;
} }
...@@ -262,8 +263,8 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { ...@@ -262,8 +263,8 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
uint32_t oldLen = colDataGetSize(pCol2, pDest->info.rows); uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows);
uint32_t newLen = colDataGetSize(pCol1, pSrc->info.rows); uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows);
int32_t newSize = oldLen + newLen; int32_t newSize = oldLen + newLen;
char* tmp = realloc(pCol2->pData, newSize); char* tmp = realloc(pCol2->pData, newSize);
...@@ -287,7 +288,7 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) { ...@@ -287,7 +288,7 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) {
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
total += colDataGetSize(pColInfoData, pBlock->info.rows); total += colDataGetLength(pColInfoData, pBlock->info.rows);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
total += sizeof(int32_t) * pBlock->info.rows; total += sizeof(int32_t) * pBlock->info.rows;
...@@ -336,7 +337,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd ...@@ -336,7 +337,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
if (isNull) { if (isNull) {
// do nothing // do nothing
} else { } else {
char* p = colDataGet(pColInfoData, j); char* p = colDataGetData(pColInfoData, j);
size += varDataTLen(p); size += varDataTLen(p);
} }
...@@ -401,7 +402,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 ...@@ -401,7 +402,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg); bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg);
char* p = colDataGet(pColData, j); char* p = colDataGetData(pColData, j);
colDataAppend(pDstCol, j - startIndex, p, isNull); colDataAppend(pDstCol, j - startIndex, p, isNull);
} }
...@@ -411,7 +412,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 ...@@ -411,7 +412,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
return pDst; return pDst;
} }
/** /**
* *
* +------------------+---------------+--------------------+ * +------------------+---------------+--------------------+
...@@ -444,7 +444,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { ...@@ -444,7 +444,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
pStart += BitmapLen(pBlock->info.rows); pStart += BitmapLen(pBlock->info.rows);
} }
uint32_t dataSize = colDataGetSize(pCol, numOfRows); uint32_t dataSize = colDataGetLength(pCol, numOfRows);
*(int32_t*) pStart = dataSize; *(int32_t*) pStart = dataSize;
pStart += sizeof(int32_t); pStart += sizeof(int32_t);
...@@ -522,6 +522,22 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { ...@@ -522,6 +522,22 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t); return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t);
} }
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols) {
SSchema* pSchema = calloc(pBlock->info.numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
pSchema[i].bytes = pColInfoData->info.bytes;
pSchema[i].type = pColInfoData->info.type;
pSchema[i].colId = pColInfoData->info.colId;
}
if (numOfCols != NULL) {
*numOfCols = pBlock->info.numOfCols;
}
return pSchema;
}
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
double rowSize = 0; double rowSize = 0;
...@@ -577,8 +593,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { ...@@ -577,8 +593,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
} }
} }
void* left1 = colDataGet(pColInfoData, left); void* left1 = colDataGetData(pColInfoData, left);
void* right1 = colDataGet(pColInfoData, right); void* right1 = colDataGetData(pColInfoData, right);
switch(pColInfoData->info.type) { switch(pColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
...@@ -617,7 +633,7 @@ static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, co ...@@ -617,7 +633,7 @@ static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, co
return code; return code;
} }
} else { } else {
char* p = colDataGet(pSrc, tupleIndex); char* p = colDataGetData(pSrc, tupleIndex);
code = colDataAppend(pDst, numOfRows, p, false); code = colDataAppend(pDst, numOfRows, p, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -956,8 +972,8 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) { ...@@ -956,8 +972,8 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
// } // }
// } // }
// void* left1 = colDataGet(pColInfoData, left); // void* left1 = colDataGetData(pColInfoData, left);
// void* right1 = colDataGet(pColInfoData, right); // void* right1 = colDataGetData(pColInfoData, right);
// switch(pColInfoData->info.type) { // switch(pColInfoData->info.type) {
// case TSDB_DATA_TYPE_INT: { // case TSDB_DATA_TYPE_INT: {
...@@ -1098,4 +1114,25 @@ void* blockDataDestroy(SSDataBlock* pBlock) { ...@@ -1098,4 +1114,25 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
tfree(pBlock->pBlockAgg); tfree(pBlock->pBlockAgg);
tfree(pBlock); tfree(pBlock);
return NULL; return NULL;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
int32_t numOfCols = pDataBlock->info.numOfCols;
SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfo.info = p->info;
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
return pBlock;
}
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock));
} }
\ No newline at end of file
...@@ -162,7 +162,7 @@ TEST(testCase, Datablock_test) { ...@@ -162,7 +162,7 @@ TEST(testCase, Datablock_test) {
ASSERT_EQ(colDataGetNumOfCols(b), 2); ASSERT_EQ(colDataGetNumOfCols(b), 2);
ASSERT_EQ(colDataGetNumOfRows(b), 40); ASSERT_EQ(colDataGetNumOfRows(b), 40);
char* pData = colDataGet(p1, 3); char* pData = colDataGetData(p1, 3);
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo)); SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo));
......
...@@ -216,13 +216,15 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA ...@@ -216,13 +216,15 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
// pHandle->tbUid = tbUid;
} }
return 0; return 0;
} }
......
...@@ -88,7 +88,7 @@ typedef struct STableCheckInfo { ...@@ -88,7 +88,7 @@ typedef struct STableCheckInfo {
int32_t compSize; int32_t compSize;
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
uint8_t chosen:2; // indicate which iterator should move forward uint8_t chosen:2; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not bool initBuf:1; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator SSkipListIterator* iiter; // imem buffer skip list iterator
} STableCheckInfo; } STableCheckInfo;
......
...@@ -68,9 +68,10 @@ typedef struct SResultRow { ...@@ -68,9 +68,10 @@ typedef struct SResultRow {
} SResultRow; } SResultRow;
typedef struct SResultRowInfo { typedef struct SResultRowInfo {
SResultRow *pCurResult; // current active result row info
SResultRow** pResult; // result list SResultRow** pResult; // result list
int16_t type:8; // data type for hash key // int16_t type:8; // data type for hash key
int32_t size:24; // number of result set int32_t size; // number of result set
int32_t capacity; // max capacity int32_t capacity; // max capacity
int32_t curPos; // current active result row index of pResult list int32_t curPos; // current active result row index of pResult list
} SResultRowInfo; } SResultRowInfo;
...@@ -95,7 +96,7 @@ struct SUdfInfo; ...@@ -95,7 +96,7 @@ struct SUdfInfo;
int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr); int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr);
size_t getResultRowSize(SArray* pExprInfo); size_t getResultRowSize(SArray* pExprInfo);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
...@@ -105,7 +106,7 @@ void closeAllResultRows(SResultRowInfo* pResultRowInfo); ...@@ -105,7 +106,7 @@ void closeAllResultRows(SResultRowInfo* pResultRowInfo);
int32_t initResultRow(SResultRow *pResultRow); int32_t initResultRow(SResultRow *pResultRow);
void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow);
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#ifndef TDENGINE_EXECUTORIMPL_H #ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H #define TDENGINE_EXECUTORIMPL_H
#include "tsort.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -444,16 +445,32 @@ typedef struct SOptrBasicInfo { ...@@ -444,16 +445,32 @@ typedef struct SOptrBasicInfo {
int32_t capacity; int32_t capacity;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SOptrBasicInfo STableIntervalOperatorInfo; typedef struct SAggSupporter {
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
} SAggSupporter;
typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SGroupResInfo groupResInfo;
SInterval interval;
STimeWindow win;
int32_t precision;
bool timeWindowInterpo;
char **pRow;
SAggSupporter aggSup;
STableQueryInfo *pCurrent;
int32_t order;
} STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SAggSupporter aggSup;
STableQueryInfo *current; STableQueryInfo *current;
uint32_t groupId; uint32_t groupId;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
...@@ -549,49 +566,42 @@ typedef struct SDistinctOperatorInfo { ...@@ -549,49 +566,42 @@ typedef struct SDistinctOperatorInfo {
SArray* pDistinctDataInfo; SArray* pDistinctDataInfo;
} SDistinctOperatorInfo; } SDistinctOperatorInfo;
struct SGlobalMerger; typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo;
typedef struct SMultiwayMergeInfo { bool hasVarCol;
struct SGlobalMerger* pMerge;
SOptrBasicInfo binfo; SArray *orderInfo; // SArray<SBlockOrderInfo>
int32_t bufCapacity; bool nullFirst;
int64_t seed; int32_t numOfSources;
char** prevRow;
SArray* orderColumnList; SSortHandle *pSortHandle;
int32_t resultRowFactor;
int32_t bufPageSize;
bool hasGroupColData; uint32_t sortBufSize; // max buffer size for in-memory sort
char** currentGroupColData;
SArray* groupColumnList; int32_t resultRowFactor;
bool hasDataBlockForNewGroup; bool hasGroupVal;
SSDataBlock* pExistBlock;
SDiskbasedBuf *pTupleStore; // keep the final results
SArray* udfInfo; int32_t numOfResPerPage;
bool hasPrev;
bool multiGroupResults; char** groupVal;
} SMultiwayMergeInfo; SArray *groupInfo;
SAggSupporter aggSup;
typedef struct SMsortComparParam { } SSortedMergeOperatorInfo;
struct SExternalMemSource **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
typedef struct SOrderOperatorInfo { typedef struct SOrderOperatorInfo {
int32_t sourceId;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock; SSDataBlock *pDataBlock;
bool hasVarCol; // has variable length column, such as binary/varchar/nchar bool hasVarCol; // has variable length column, such as binary/varchar/nchar
int32_t numOfCompleted; SArray *orderInfo;
SDiskbasedBuf *pSortInternalBuf; bool nullFirst;
SMultiwayMergeTreeInfo *pMergeTree; SSortHandle *pSortHandle;
SArray *pSources; // SArray<SExternalMemSource*>
int32_t bufPageSize; int32_t bufPageSize;
int32_t numOfRowsInRes; int32_t numOfRowsInRes;
SMsortComparParam cmpParam; // TODO extact struct
int64_t startTs; // sort start time int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
uint64_t totalSize; // total load bytes from remote uint64_t totalSize; // total load bytes from remote
...@@ -608,8 +618,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray ...@@ -608,8 +618,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
...@@ -641,9 +651,8 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -641,9 +651,8 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal); SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOrder* pOrderVal);
// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); // SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); // SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
...@@ -691,9 +700,6 @@ int32_t checkForQueryBuf(size_t numOfTables); ...@@ -691,9 +700,6 @@ int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo* pQInfo); bool checkNeedToCompressQueryCol(SQInfo* pQInfo);
void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status); void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status);
bool onlyQueryTags(STaskAttr* pQueryAttr);
// void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen); int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen);
size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows); size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TLINEARHASH_H
#define TDENGINE_TLINEARHASH_H
#ifdef __cplusplus
extern "C" {
#endif
#include "thash.h"
enum {
LINEAR_HASH_STATIS = 0x1,
LINEAR_HASH_DATA = 0x2,
};
typedef struct SLHashObj SLHashObj;
SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage);
void* tHashCleanup(SLHashObj* pHashObj);
int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size);
char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen);
int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen);
void tHashPrint(const SLHashObj* pHashObj, int32_t type);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TLINEARHASH_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSORT_H
#define TDENGINE_TSORT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "common.h"
#include "os.h"
enum {
SORT_MULTISOURCE_MERGE = 0x1,
SORT_SINGLESOURCE_SORT = 0x2,
};
typedef struct SMultiMergeSource {
int32_t type;
int32_t rowIndex;
SSDataBlock *pBlock;
} SMultiMergeSource;
typedef struct SExternalMemSource {
SMultiMergeSource src;
SArray* pageIdList;
int32_t pageIndex;
} SExternalMemSource;
typedef struct SGenericSource {
SMultiMergeSource src;
void *param;
} SGenericSource;
typedef struct SMsortComparParam {
void **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
typedef struct SSortHandle SSortHandle;
typedef struct STupleHandle STupleHandle;
typedef SSDataBlock* (*_sort_fetch_block_fn_t)(void* param);
typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* param);
/**
*
* @param type
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr);
/**
*
* @param pSortHandle
*/
void tsortDestroySortHandle(SSortHandle* pSortHandle);
/**
*
* @param pHandle
* @return
*/
int32_t tsortOpen(SSortHandle* pHandle);
/**
*
* @param pHandle
* @return
*/
int32_t tsortClose(SSortHandle* pHandle);
/**
*
* @return
*/
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp);
/**
*
* @param pHandle
* @param fp
* @return
*/
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
/**
*
* @param pHandle
* @param pSource
* @return success or failed
*/
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource);
/**
*
* @param pHandle
* @return
*/
STupleHandle* tsortNextTuple(SSortHandle* pHandle);
/**
*
* @param pHandle
* @param colIndex
* @return
*/
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex);
/**
*
* @param pHandle
* @param colIndex
* @return
*/
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSORT_H
...@@ -53,8 +53,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { ...@@ -53,8 +53,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
return size; return size;
} }
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
pResultRowInfo->type = type;
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
pResultRowInfo->curPos = -1; pResultRowInfo->curPos = -1;
pResultRowInfo->capacity = size; pResultRowInfo->capacity = size;
...@@ -93,7 +92,7 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow ...@@ -93,7 +92,7 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
for (int32_t i = 0; i < pResultRowInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow *pWindowRes = pResultRowInfo->pResult[i]; SResultRow *pWindowRes = pResultRowInfo->pResult[i];
clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type); clearResultRow(pRuntimeEnv, pWindowRes);
int32_t groupIndex = 0; int32_t groupIndex = 0;
int64_t uid = 0; int64_t uid = 0;
...@@ -136,7 +135,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { ...@@ -136,7 +135,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow(pResultRowInfo, slot)->closed = true; getResultRow(pResultRowInfo, slot)->closed = true;
} }
void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) { void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
if (pResultRow == NULL) { if (pResultRow == NULL) {
return; return;
} }
......
...@@ -93,3 +93,25 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { ...@@ -93,3 +93,25 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return pTaskInfo; return pTaskInfo;
} }
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo* )tinfo;
// traverse to the streamscan node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot;
while(pInfo->operatorType != OP_StreamScan) {
pInfo = pInfo->pDownstream[0];
}
SStreamBlockScanInfo* pScanInfo = pInfo->info;
if (isAdd) {
int32_t code = tqReadHandleSetTbUidList(pScanInfo->readerHandle, tableIdList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
assert(0);
}
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tlinearhash.h"
#include "tcfg.h"
#include "taoserror.h"
#include "tpagedbuf.h"
#define LHASH_CAP_RATIO 0.85
// Always located in memory
typedef struct SLHashBucket {
SArray *pPageIdList;
int32_t size; // the number of element in this entry
} SLHashBucket;
typedef struct SLHashObj {
SDiskbasedBuf *pBuf;
_hash_fn_t hashFn;
int32_t tuplesPerPage;
SLHashBucket **pBucket; // entry list
int32_t numOfAlloc; // number of allocated bucket ptr slot
int32_t bits; // the number of bits used in hash
int32_t numOfBuckets; // the number of buckets
int64_t size; // the number of total items
} SLHashObj;
/**
* the data struct for each hash node
* +-----------+-------+--------+
* | SLHashNode| key | data |
* +-----------+-------+--------+
*/
typedef struct SLHashNode {
uint16_t keyLen;
uint16_t dataLen;
} SLHashNode;
#define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode))
#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen)
#define GET_LHASH_NODE_LEN(_n) (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen)
static int32_t doAddNewBucket(SLHashObj* pHashObj);
static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) {
return hashv & ((1ul << (bits)) - 1);
}
static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) {
int32_t v = bucketId - (1ul << (bits - 1));
ASSERT(v < numOfBuckets);
return v;
}
static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) {
int32_t splitBucketId = (1ul << (bits - 1)) ^ bucketId;
return splitBucketId;
}
static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) {
*(uint16_t*) p = keyLen;
p += sizeof(uint16_t);
*(uint16_t*) p = size;
p += sizeof(uint16_t);
memcpy(p, key, keyLen);
p += keyLen;
memcpy(p, data, size);
}
static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen,
const void* data, int32_t size) {
int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList);
SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId);
ASSERT (pPage != NULL);
// put to current buf page
size_t nodeSize = sizeof(SLHashNode) + keyLen + size;
ASSERT(nodeSize + sizeof(SFilePage) <= getBufPageSize(pHashObj->pBuf));
if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) {
releaseBufPage(pHashObj->pBuf, pPage);
// allocate the overflow buffer page to hold this k/v.
int32_t newPageId = -1;
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId);
if (pNewPage == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBucket->pPageIdList, &newPageId);
doCopyObject(pNewPage->data, key, keyLen, data, size);
pNewPage->num = sizeof(SFilePage) + nodeSize;
setBufPageDirty(pNewPage, true);
releaseBufPage(pHashObj->pBuf, pNewPage);
} else {
char* p = (char*) pPage + pPage->num;
doCopyObject(p, key, keyLen, data, size);
pPage->num += nodeSize;
setBufPageDirty(pPage, true);
releaseBufPage(pHashObj->pBuf, pPage);
}
pBucket->size += 1;
// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
return TSDB_CODE_SUCCESS;
}
static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) {
ASSERT(pPage != NULL && pNode != NULL && pBucket->size >= 1);
int32_t len = GET_LHASH_NODE_LEN(pNode);
char* p = (char*) pNode + len;
char* pEnd = (char*)pPage + pPage->num;
memmove(pNode, p, (pEnd - p));
pPage->num -= len;
if (pPage->num == 0) {
// this page is empty, could be recycle in the future.
}
setBufPageDirty(pPage, true);
pBucket->size -= 1;
}
static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList);
if (numOfPages <= 1) {
return;
}
int32_t* firstPage = taosArrayGet(pBucket->pPageIdList, 0);
SFilePage* pFirst = getBufPage(pHashObj->pBuf, *firstPage);
int32_t* pageId = taosArrayGetLast(pBucket->pPageIdList);
SFilePage* pLast = getBufPage(pHashObj->pBuf, *pageId);
if (pLast->num <= sizeof(SFilePage)) {
// this is empty
dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
releaseBufPage(pHashObj->pBuf, pFirst);
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
return;
}
char* pStart = pLast->data;
int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
while (1) {
if (pFirst->num + nodeSize < getBufPageSize(pHashObj->pBuf)) {
char* p = ((char*)pFirst) + pFirst->num;
SLHashNode* pNode = (SLHashNode*)pStart;
doCopyObject(p, GET_LHASH_NODE_KEY(pStart), pNode->keyLen, GET_LHASH_NODE_DATA(pStart), pNode->dataLen);
setBufPageDirty(pFirst, true);
setBufPageDirty(pLast, true);
ASSERT(pLast->num >= nodeSize + sizeof(SFilePage));
pFirst->num += nodeSize;
pLast->num -= nodeSize;
pStart += nodeSize;
if (pLast->num <= sizeof(SFilePage)) {
// this is empty
dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
releaseBufPage(pHashObj->pBuf, pFirst);
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
break;
}
nodeSize = GET_LHASH_NODE_LEN(pStart);
} else { // move to the front of pLast page
if (pStart != pLast->data) {
memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart));
setBufPageDirty(pLast, true);
}
releaseBufPage(pHashObj->pBuf, pLast);
releaseBufPage(pHashObj->pBuf, pFirst);
break;
}
}
}
static int32_t doAddNewBucket(SLHashObj* pHashObj) {
if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) {
int32_t newLen = pHashObj->numOfAlloc * 1.25;
if (newLen == pHashObj->numOfAlloc) {
newLen += 4;
}
char* p = realloc(pHashObj->pBucket, POINTER_BYTES * newLen);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets);
pHashObj->pBucket = (SLHashBucket**) p;
pHashObj->numOfAlloc = newLen;
}
SLHashBucket* pBucket = calloc(1, sizeof(SLHashBucket));
pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket;
pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t));
if (pBucket->pPageIdList == NULL || pBucket == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t pageId = -1;
SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId);
p->num = sizeof(SFilePage);
setBufPageDirty(p, true);
releaseBufPage(pHashObj->pBuf, p);
taosArrayPush(pBucket->pPageIdList, &pageId);
pHashObj->numOfBuckets += 1;
// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
return TSDB_CODE_SUCCESS;
}
SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage) {
SLHashObj* pHashObj = calloc(1, sizeof(SLHashObj));
if (pHashObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, "/tmp");
if (code != 0) {
terrno = code;
return NULL;
}
setBufPageCompressOnDisk(pHashObj->pBuf, false);
/**
* The number of bits in the hash value, which is used to decide the exact bucket where the object should be located in.
* The initial value is 0.
*/
pHashObj->bits = 0;
pHashObj->hashFn = fn;
pHashObj->tuplesPerPage = numOfTuplePerPage;
pHashObj->numOfAlloc = 4; // initial allocated array list
pHashObj->pBucket = calloc(pHashObj->numOfAlloc, POINTER_BYTES);
code = doAddNewBucket(pHashObj);
if (code != TSDB_CODE_SUCCESS) {
destroyDiskbasedBuf(pHashObj->pBuf);
tfree(pHashObj);
terrno = code;
return NULL;
}
return pHashObj;
}
void* tHashCleanup(SLHashObj* pHashObj) {
destroyDiskbasedBuf(pHashObj->pBuf);
for(int32_t i = 0; i < pHashObj->numOfBuckets; ++i) {
taosArrayDestroy(pHashObj->pBucket[i]->pPageIdList);
tfree(pHashObj->pBucket[i]);
}
tfree(pHashObj->pBucket);
tfree(pHashObj);
return NULL;
}
int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
ASSERT(pHashObj != NULL && key != NULL);
if (pHashObj->bits == 0) {
SLHashBucket* pBucket = pHashObj->pBucket[0];
doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size);
} else {
int32_t hashVal = pHashObj->hashFn(key, keyLen);
int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits);
if (v >= pHashObj->numOfBuckets) {
int32_t newBucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets);
// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
v = newBucketId;
}
SLHashBucket* pBucket = pHashObj->pBucket[v];
int32_t code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pHashObj->size += 1;
// Too many records, needs to bucket split
if ((pHashObj->numOfBuckets * LHASH_CAP_RATIO * pHashObj->tuplesPerPage) < pHashObj->size) {
int32_t newBucketId = pHashObj->numOfBuckets;
int32_t code = doAddNewBucket(pHashObj);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2));
if (numOfBits > pHashObj->bits) {
// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
ASSERT(numOfBits == pHashObj->bits + 1);
pHashObj->bits = numOfBits;
}
int32_t splitBucketId = doGetRelatedSplitBucketId(newBucketId, pHashObj->bits);
// load all data in this bucket and check if the data needs to relocated into the new bucket
SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId];
// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) {
int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i);
SFilePage* p = getBufPage(pHashObj->pBuf, pageId);
char* pStart = p->data;
while (pStart - ((char*) p) < p->num) {
SLHashNode* pNode = (SLHashNode*)pStart;
ASSERT(pNode->keyLen > 0 && pNode->dataLen >= 0);
char* k = GET_LHASH_NODE_KEY(pNode);
int32_t hashv = pHashObj->hashFn(k, pNode->keyLen);
int32_t v1 = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
if (v1 != splitBucketId) { // place it into the new bucket
ASSERT(v1 == newBucketId);
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
GET_LHASH_NODE_KEY(pNode), pNode->dataLen);
doRemoveFromBucket(p, pNode, pBucket);
} else {
// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
pStart += nodeSize;
}
}
releaseBufPage(pHashObj->pBuf, p);
}
doCompressBucketPages(pHashObj, pBucket);
}
return TSDB_CODE_SUCCESS;
}
char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
ASSERT(pHashObj != NULL && key != NULL && keyLen > 0);
int32_t hashv = pHashObj->hashFn(key, keyLen);
int32_t bucketId = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
if (bucketId >= pHashObj->numOfBuckets) {
bucketId = doGetAlternativeBucketId(bucketId, pHashObj->bits, pHashObj->numOfBuckets);
}
SLHashBucket* pBucket = pHashObj->pBucket[bucketId];
for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) {
int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i);
SFilePage* p = getBufPage(pHashObj->pBuf, pageId);
char* pStart = p->data;
while (pStart - p->data < p->num) {
SLHashNode* pNode = (SLHashNode*)pStart;
char* k = GET_LHASH_NODE_KEY(pNode);
if (pNode->keyLen == keyLen && (memcmp(key, k, keyLen) == 0)) {
releaseBufPage(pHashObj->pBuf, p);
return GET_LHASH_NODE_DATA(pNode);
} else {
pStart += GET_LHASH_NODE_LEN(pStart);
}
}
releaseBufPage(pHashObj->pBuf, p);
}
return NULL;
}
int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) {
// todo
}
void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
printf("==================== linear hash ====================\n");
printf("total bucket:%d, size:%ld, ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO);
dBufSetPrintInfo(pHashObj->pBuf);
if (type == LINEAR_HASH_DATA) {
for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) {
// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
}
} else {
dBufPrintStatis(pHashObj->pBuf);
}
}
\ No newline at end of file
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <iostream>
#include "executorimpl.h"
#include "tlinearhash.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
TEST(testCase, linear_hash_Tests) {
srand(time(NULL));
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
#if 0
SLHashObj* pHashObj = tHashInit(256, 4096, fn, 320);
for(int32_t i = 0; i < 5000000; ++i) {
int32_t code = tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
assert(code == 0);
}
// tHashPrint(pHashObj, LINEAR_HASH_STATIS);
// for(int32_t i = 0; i < 10000; ++i) {
// char* v = tHashGet(pHashObj, &i, sizeof(i));
// if (v != NULL) {
//// printf("find value: %d, key:%d\n", *(int32_t*) v, i);
// } else {
// printf("failed to found key:%d in hash\n", i);
// }
// }
tHashPrint(pHashObj, LINEAR_HASH_STATIS);
tHashCleanup(pHashObj);
#endif
#if 0
SHashObj* pHashObj = taosHashInit(1000, fn, false, HASH_NO_LOCK);
for(int32_t i = 0; i < 1000000; ++i) {
taosHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
}
for(int32_t i = 0; i < 10000; ++i) {
void* v = taosHashGet(pHashObj, &i, sizeof(i));
}
taosHashCleanup(pHashObj);
#endif
}
\ No newline at end of file
此差异已折叠。
...@@ -29,9 +29,10 @@ ...@@ -29,9 +29,10 @@
#include "tcompression.h" #include "tcompression.h"
//#include "queryLog.h" //#include "queryLog.h"
#include "tudf.h" #include "tudf.h"
#include "tep.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput)) #define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes) #define GET_INPUT_DATA(x, y) ((char*) colDataGetData((x)->pInput, (y)))
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) #define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) #define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
...@@ -3818,7 +3819,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) { ...@@ -3818,7 +3819,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) {
skey = ekey; skey = ekey;
} }
} }
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->resDataInfo.bytes, pCtx->inputType); // assignVal(pCtx->pOutput, pCtx->pInput, pCtx->resDataInfo.bytes, pCtx->inputType);
} else if (type == TSDB_FILL_NEXT) { } else if (type == TSDB_FILL_NEXT) {
TSKEY ekey = skey; TSKEY ekey = skey;
char* val = NULL; char* val = NULL;
...@@ -4395,7 +4396,7 @@ SFunctionFpSet fpSet[1] = { ...@@ -4395,7 +4396,7 @@ SFunctionFpSet fpSet[1] = {
.addInput = count_function, .addInput = count_function,
.finalize = doFinalizer, .finalize = doFinalizer,
.combine = count_func_merge, .combine = count_func_merge,
} },
}; };
SAggFunctionInfo aggFunc[35] = {{ SAggFunctionInfo aggFunc[35] = {{
......
...@@ -221,7 +221,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, ...@@ -221,7 +221,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
} }
pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT;
pBucket->bufPageSize = DEFAULT_PAGE_SIZE * 4; // 4k per page pBucket->bufPageSize = 16384 * 4; // 16k per page
pBucket->type = dataType; pBucket->type = dataType;
pBucket->bytes = nElemSize; pBucket->bytes = nElemSize;
...@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, ...@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket); resetSlotInfo(pBucket);
int32_t ret = createDiskbasedBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir); int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, "/tmp");
if (ret != 0) { if (ret != 0) {
tMemBucketDestroy(pBucket); tMemBucketDestroy(pBucket);
return NULL; return NULL;
...@@ -269,7 +269,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) { ...@@ -269,7 +269,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return; return;
} }
destroyResultBuf(pBucket->pBuffer); destroyDiskbasedBuf(pBucket->pBuffer);
tfree(pBucket->pSlots); tfree(pBucket->pSlots);
tfree(pBucket); tfree(pBucket);
} }
...@@ -347,7 +347,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { ...@@ -347,7 +347,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = NULL; pSlot->info.data = NULL;
} }
pSlot->info.data = getNewDataBuf(pBucket->pBuffer, groupId, &pageId); pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId);
pSlot->info.pageId = pageId; pSlot->info.pageId = pageId;
} }
......
...@@ -230,7 +230,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo) { ...@@ -230,7 +230,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo) {
} }
void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
assert(dst != NULL && src != NULL); assert(dst != NULL && src != NULL/* && src->base.numOfCols > 0*/);
*dst = *src; *dst = *src;
#if 0 #if 0
...@@ -241,8 +241,12 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { ...@@ -241,8 +241,12 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
#endif #endif
dst->pExpr = exprdup(src->pExpr); dst->pExpr = exprdup(src->pExpr);
dst->base.pColumns = calloc(src->base.numOfCols, sizeof(SColumn)); if (src->base.numOfCols > 0) {
memcpy(dst->base.pColumns, src->base.pColumns, sizeof(SColumn) * src->base.numOfCols); dst->base.pColumns = calloc(src->base.numOfCols, sizeof(SColumn));
memcpy(dst->base.pColumns, src->base.pColumns, sizeof(SColumn) * src->base.numOfCols);
} else {
dst->base.pColumns = NULL;
}
memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param)); memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param));
for (int32_t j = 0; j < src->base.numOfParams; ++j) { for (int32_t j = 0; j < src->base.numOfParams; ++j) {
......
...@@ -306,7 +306,7 @@ typedef struct SFilterInfo { ...@@ -306,7 +306,7 @@ typedef struct SFilterInfo {
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnNode *)((fi)->desc))->colId) #define FILTER_GET_COL_FIELD_ID(fi) (((SColumnNode *)((fi)->desc))->colId)
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnNode *)((fi)->desc))->slotId) #define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnNode *)((fi)->desc))->slotId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnNode *)((fi)->desc)) #define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnNode *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) (colDataGet(((SColumnInfoData *)(fi)->data), (ri))) #define FILTER_GET_COL_FIELD_DATA(fi, ri) (colDataGetData(((SColumnInfoData *)(fi)->data), (ri)))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((SValueNode *)((fi)->desc))->node.resType.type) #define FILTER_GET_VAL_FIELD_TYPE(fi) (((SValueNode *)((fi)->desc))->node.resType.type)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data) #define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc) #define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc)
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tcompare.h" #include "tcompare.h"
#include "filterInt.h" #include "filterInt.h"
#include "filter.h" #include "filter.h"
#include "tep.h"
OptrStr gOptrStr[] = { OptrStr gOptrStr[] = {
{0, "invalid"}, {0, "invalid"},
...@@ -2776,7 +2777,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p, ...@@ -2776,7 +2777,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
uint32_t unitNum = *(unitIdx++); uint32_t unitNum = *(unitIdx++);
for (uint32_t u = 0; u < unitNum; ++u) { for (uint32_t u = 0; u < unitNum; ++u) {
SFilterComUnit *cunit = &info->cunits[*(unitIdx + u)]; SFilterComUnit *cunit = &info->cunits[*(unitIdx + u)];
void *colData = colDataGet((SColumnInfoData *)cunit->colData, i); void *colData = colDataGetData((SColumnInfoData *)cunit->colData, i);
//if (FILTER_UNIT_GET_F(info, uidx)) { //if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx); // p[i] = FILTER_UNIT_GET_R(info, uidx);
...@@ -2874,7 +2875,7 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, ...@@ -2874,7 +2875,7 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0]; uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){ if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){
if (!colData){ // for json->'key' is null if (!colData){ // for json->'key' is null
(*p)[i] = 1; (*p)[i] = 1;
...@@ -2908,7 +2909,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows ...@@ -2908,7 +2909,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0]; uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){ if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){
if (!colData) { // for json->'key' is not null if (!colData) { // for json->'key' is not null
...@@ -2949,7 +2950,7 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD ...@@ -2949,7 +2950,7 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
} }
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
void *colData = colDataGet((SColumnInfoData *)info->cunits[0].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i);
if (colData == NULL || isNull(colData, info->cunits[0].dataType)) { if (colData == NULL || isNull(colData, info->cunits[0].dataType)) {
all = false; all = false;
...@@ -2980,7 +2981,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa ...@@ -2980,7 +2981,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0]; uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
if (colData == NULL || isNull(colData, info->cunits[uidx].dataType)) { if (colData == NULL || isNull(colData, info->cunits[uidx].dataType)) {
(*p)[i] = 0; (*p)[i] = 0;
all = false; all = false;
...@@ -3031,7 +3032,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg ...@@ -3031,7 +3032,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
for (uint32_t u = 0; u < group->unitNum; ++u) { for (uint32_t u = 0; u < group->unitNum; ++u) {
uint32_t uidx = group->unitIdxs[u]; uint32_t uidx = group->unitIdxs[u];
SFilterComUnit *cunit = &info->cunits[uidx]; SFilterComUnit *cunit = &info->cunits[uidx];
void *colData = colDataGet((SColumnInfoData *)(cunit->colData), i); void *colData = colDataGetData((SColumnInfoData *)(cunit->colData), i);
//if (FILTER_UNIT_GET_F(info, uidx)) { //if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx); // p[i] = FILTER_UNIT_GET_R(info, uidx);
......
...@@ -226,7 +226,7 @@ void* getVectorValueAddr_default(void *src, int32_t index) { ...@@ -226,7 +226,7 @@ void* getVectorValueAddr_default(void *src, int32_t index) {
return src; return src;
} }
void* getVectorValueAddr_VAR(void *src, int32_t index) { void* getVectorValueAddr_VAR(void *src, int32_t index) {
return colDataGet((SColumnInfoData *)src, index); return colDataGetData((SColumnInfoData *)src, index);
} }
_getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) { _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) {
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册