提交 8e9f5cd9 编写于 作者: H Hongze Cheng

Merge branch 'refact/tsdb_optimize' of https://github.com/taosdata/TDengine...

Merge branch 'refact/tsdb_optimize' of https://github.com/taosdata/TDengine into refact/tsdb_new_format
......@@ -34,7 +34,7 @@ endif(${BUILD_TEST})
add_subdirectory(source)
add_subdirectory(tools)
add_subdirectory(tests)
add_subdirectory(utils)
add_subdirectory(examples/c)
# docs
......
......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG f169c0f
GIT_TAG a4d9b92
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -184,7 +184,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
int32_t getJsonValueLen(const char* data);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows);
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
......@@ -225,15 +226,16 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
SSDataBlock* createDataBlock();
void* blockDataDestroy(SSDataBlock* pBlock);
void blockDataFreeRes(SSDataBlock* pBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createSpecialDataBlock(EStreamType type);
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
......@@ -249,7 +251,6 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
......
......@@ -785,6 +785,8 @@ typedef struct {
int32_t walRollPeriod;
int64_t walSegmentSize;
int32_t sstTrigger;
int16_t hashPrefix;
int16_t hashSuffix;
} SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
......@@ -1194,6 +1196,8 @@ typedef struct {
int32_t walRollPeriod;
int64_t walSegmentSize;
int16_t sstTrigger;
int16_t hashPrefix;
int16_t hashSuffix;
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......
......@@ -362,6 +362,12 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_SST_TRIGGER 1
#define TSDB_MAX_SST_TRIGGER 128
#define TSDB_DEFAULT_SST_TRIGGER 8
#define TSDB_MIN_HASH_PREFIX 0
#define TSDB_MAX_HASH_PREFIX 128
#define TSDB_DEFAULT_HASH_PREFIX 0
#define TSDB_MIN_HASH_SUFFIX 0
#define TSDB_MAX_HASH_SUFFIX 128
#define TSDB_DEFAULT_HASH_SUFFIX 0
#define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1
#define TSDB_REP_DEF_DB_WAL_RET_PERIOD 0
......
......@@ -51,9 +51,10 @@ struct SRBTreeNode {
SRBTreeNode *parent;
SRBTreeNode *left;
SRBTreeNode *right;
uint8_t payload[0];
};
#define RBTREE_NODE_PAYLOAD(N) ((const void *)&(N)[1])
struct SRBTree {
tRBTreeCmprFn cmprFn;
int64_t n;
......
......@@ -20,6 +20,7 @@
#include "tcrc32c.h"
#include "tdef.h"
#include "tmd5.h"
#include "thash.h"
#ifdef __cplusplus
extern "C" {
......@@ -68,6 +69,8 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
memcpy(target, buf, TSDB_PASSWORD_LEN);
}
#define taosGetTbHashVal(tbname, tblen, method, prefix, suffix) MurmurHash3_32((tbname), (tblen))
#ifdef __cplusplus
}
#endif
......
......@@ -547,6 +547,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end;
}
needCheckMeta = true;
taosHashCleanup(hashTmp);
hashTmp = NULL;
} else {
uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
goto end;
......@@ -576,7 +578,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
sTableData->tableMeta = pTableMeta;
tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
taosHashCleanup(hashTmp);
}
return 0;
......
......@@ -90,7 +90,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "minrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "maxrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "retentions", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true},
......
......@@ -140,7 +140,8 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
return TSDB_CODE_SUCCESS;
}
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, int32_t itemLen, int32_t numOfRows) {
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
int32_t itemLen, int32_t numOfRows) {
ASSERT(pColumnInfoData->info.bytes >= itemLen);
size_t start = 1;
......@@ -148,21 +149,23 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
memcpy(pColumnInfoData->pData, pData, itemLen);
int32_t t = 0;
int32_t count = log(numOfRows)/log(2);
while(t < count) {
int32_t count = log(numOfRows) / log(2);
while (t < count) {
int32_t xlen = 1 << t;
memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, xlen * itemLen);
memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData,
xlen * itemLen);
t += 1;
start += xlen;
}
// the tail part
if (numOfRows > start) {
memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, (numOfRows - start) * itemLen);
memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData,
(numOfRows - start) * itemLen);
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
for(int32_t i = 0; i < numOfRows; ++i) {
for (int32_t i = 0; i < numOfRows; ++i) {
pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
}
......@@ -170,7 +173,8 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
}
}
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) {
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows) {
ASSERT(pData != NULL && pColumnInfoData != NULL);
int32_t len = pColumnInfoData->info.bytes;
......@@ -278,7 +282,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
} else {
if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) {
// all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
// ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
// ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
if (tmp == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
......@@ -557,7 +561,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
}
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
int32_t numOfRows = *(int32_t*) buf;
int32_t numOfRows = *(int32_t*)buf;
blockDataEnsureCapacity(pBlock, numOfRows);
pBlock->info.rows = numOfRows;
......@@ -676,7 +680,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
* @return
*/
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
// length |
return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) +
numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
}
......@@ -1302,6 +1307,40 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* createSpecialDataBlock(EStreamType type) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = type;
pBlock->info.rowSize =
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
pBlock->info.watermark = INT64_MIN;
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// window start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// window end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
// uid
taosArrayPush(pBlock->pDataBlock, &infoData);
// group id
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if (pDataBlock == NULL) {
return NULL;
......@@ -1426,7 +1465,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
}
void colDataDestroy(SColumnInfoData* pColData) {
if(!pColData) return;
if (!pColData) return;
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
taosMemoryFreeClear(pColData->varmeta.offset);
} else {
......@@ -1693,7 +1732,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
struct tm ptm = {0};
taosLocalTime(&tt, &ptm);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms);
......@@ -1847,20 +1886,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
break;
case TSDB_DATA_TYPE_VARCHAR: {
memset(pBuf, 0, sizeof(pBuf));
char* pData = colDataGetVarData(pColInfoData, j);
char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
memcpy(pBuf, varDataVal(pData), dataSize);
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf;
} break;
} break;
case TSDB_DATA_TYPE_NCHAR: {
char* pData = colDataGetVarData(pColInfoData, j);
char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
memset(pBuf, 0, sizeof(pBuf));
taosUcs4ToMbs((TdUcs4 *)varDataVal(pData), dataSize, pBuf);
taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf;
} break;
} break;
}
}
len += snprintf(dumpBuf + len, size - len, "\n");
......@@ -1877,7 +1916,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
* @param pDataBlocks
* @param vgId
* @param suid
*
*
*/
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid) {
......@@ -1904,8 +1943,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) {
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
// int32_t rowSize = pDataBlock->info.rowSize;
// int64_t groupId = pDataBlock->info.groupId;
......@@ -1926,7 +1965,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen + dataLen)); // set row buf
bool isStartKey = false;
int32_t offset = 0;
......@@ -2089,7 +2128,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
// flag segment.
// the inital bit is for column info
int32_t* flagSegment = (int32_t*)data;
*flagSegment = (1<<31);
*flagSegment = (1 << 31);
data += sizeof(int32_t);
......@@ -2149,7 +2188,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
const char* pStart = pData;
int32_t version = *(int32_t*) pStart;
int32_t version = *(int32_t*)pStart;
pStart += sizeof(int32_t);
ASSERT(version == 1);
......@@ -2158,7 +2197,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart += sizeof(int32_t);
// total rows sizeof(int32_t)
int32_t numOfRows = *(int32_t*)pStart;
int32_t numOfRows = *(int32_t*)pStart;
pStart += sizeof(int32_t);
// total columns sizeof(int32_t)
......
......@@ -2027,6 +2027,8 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
if (tEncodeI64(&encoder, pReq->walSegmentSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->sstTrigger) < 0) return -1;
if (tEncodeI16(&encoder, pReq->hashPrefix) < 0) return -1;
if (tEncodeI16(&encoder, pReq->hashSuffix) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
......@@ -2074,6 +2076,8 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->walSegmentSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->sstTrigger) < 0) return -1;
if (tDecodeI16(&decoder, &pReq->hashPrefix) < 0) return -1;
if (tDecodeI16(&decoder, &pReq->hashSuffix) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
......@@ -3769,6 +3773,8 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
if (tEncodeI64(&encoder, pReq->walSegmentSize) < 0) return -1;
if (tEncodeI16(&encoder, pReq->sstTrigger) < 0) return -1;
if (tEncodeI16(&encoder, pReq->hashPrefix) < 0) return -1;
if (tEncodeI16(&encoder, pReq->hashSuffix) < 0) return -1;
tEndEncode(&encoder);
......@@ -3842,6 +3848,8 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->walSegmentSize) < 0) return -1;
if (tDecodeI16(&decoder, &pReq->sstTrigger) < 0) return -1;
if (tDecodeI16(&decoder, &pReq->hashPrefix) < 0) return -1;
if (tDecodeI16(&decoder, &pReq->hashSuffix) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -171,6 +171,8 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd;
pCfg->hashMethod = pCreate->hashMethod;
pCfg->hashPrefix = pCreate->hashPrefix;
pCfg->hashSuffix = pCreate->hashSuffix;
pCfg->standby = pCfg->standby;
pCfg->syncCfg.myIndex = pCreate->selfIndex;
......@@ -220,9 +222,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d sstTrigger:%d",
dInfo("vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d sstTrigger:%d",
createReq.vgId, createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize,
createReq.sstTrigger);
dInfo("vgId:%d, hashMethod:%d begin:%u end:%u prefix:%d surfix:%d", createReq.vgId, createReq.hashMethod,
createReq.hashBegin, createReq.hashEnd, createReq.hashPrefix, createReq.hashSuffix);
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
......
......@@ -305,13 +305,15 @@ typedef struct {
int8_t hashMethod; // default is 1
int8_t cacheLast;
int8_t schemaless;
int16_t hashPrefix;
int16_t hashSuffix;
int16_t sstTrigger;
int32_t numOfRetensions;
SArray* pRetensions;
int32_t walRetentionPeriod;
int64_t walRetentionSize;
int32_t walRollPeriod;
int64_t walRetentionSize;
int64_t walSegmentSize;
int16_t sstTrigger;
} SDbCfg;
typedef struct {
......
......@@ -30,7 +30,7 @@
#include "systable.h"
#define DB_VER_NUMBER 1
#define DB_RESERVE_SIZE 62
#define DB_RESERVE_SIZE 58
static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
......@@ -125,6 +125,8 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER)
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER)
SDB_SET_INT16(pRaw, dataPos, pDb->cfg.sstTrigger, _OVER)
SDB_SET_INT16(pRaw, dataPos, pDb->cfg.hashPrefix, _OVER)
SDB_SET_INT16(pRaw, dataPos, pDb->cfg.hashSuffix, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
......@@ -209,6 +211,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pDb->cfg.sstTrigger, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pDb->cfg.hashPrefix, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pDb->cfg.hashSuffix, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pDb->lock);
......@@ -334,6 +338,8 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1;
if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1;
if (pCfg->sstTrigger < TSDB_MIN_SST_TRIGGER || pCfg->sstTrigger > TSDB_MAX_SST_TRIGGER) return -1;
if (pCfg->hashPrefix < TSDB_MIN_HASH_PREFIX || pCfg->hashPrefix > TSDB_MAX_HASH_PREFIX) return -1;
if (pCfg->hashSuffix < TSDB_MIN_HASH_SUFFIX || pCfg->hashSuffix > TSDB_MAX_HASH_SUFFIX) return -1;
terrno = 0;
return terrno;
......@@ -368,6 +374,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_REPS_DEF_DB_WAL_ROLL_PERIOD;
if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
if (pCfg->sstTrigger <= 0) pCfg->sstTrigger = TSDB_DEFAULT_SST_TRIGGER;
if (pCfg->hashPrefix < 0) pCfg->hashPrefix = TSDB_DEFAULT_HASH_PREFIX;
if (pCfg->hashSuffix < 0) pCfg->hashSuffix = TSDB_DEFAULT_HASH_SUFFIX;
}
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
......@@ -485,6 +493,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.walRollPeriod = pCreate->walRollPeriod,
.walSegmentSize = pCreate->walSegmentSize,
.sstTrigger = pCreate->sstTrigger,
.hashPrefix = pCreate->hashPrefix,
.hashSuffix = pCreate->hashSuffix,
};
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
......
......@@ -235,6 +235,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.walRollPeriod = pDb->cfg.walRollPeriod;
createReq.walSegmentSize = pDb->cfg.walSegmentSize;
createReq.sstTrigger = pDb->cfg.sstTrigger;
createReq.hashPrefix = pDb->cfg.hashPrefix;
createReq.hashSuffix = pDb->cfg.hashSuffix;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v];
......
......@@ -289,6 +289,8 @@ struct SVnodeCfg {
uint32_t hashBegin;
uint32_t hashEnd;
int16_t sstTrigger;
int16_t hashPrefix;
int16_t hashSuffix;
};
typedef struct {
......
......@@ -103,7 +103,7 @@ int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp **pMetaRsp);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
......@@ -174,7 +174,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -129,10 +129,16 @@ _err:
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
// query uid.idx
metaRLock(pMeta);
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
metaULock(pMeta);
return false;
}
metaULock(pMeta);
return true;
}
......@@ -182,9 +188,14 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
}
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
metaGetTableEntryByUid(&mr, uid);
code = metaGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
return -1;
}
STR_TO_VARSTR(tbName, mr.me.name);
metaReaderClear(&mr);
......
......@@ -816,7 +816,86 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
}
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
bool failed = false;
SDecoder* pCoder = &(SDecoder){0};
SDeleteRes* pRes = &(SDeleteRes){0};
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
if (pRes->uidList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
failed = true;
}
tDecoderInit(pCoder, pReq, len);
tDecodeDeleteRes(pCoder, pRes);
tDecoderClear(pCoder);
int32_t sz = taosArrayGetSize(pRes->uidList);
if (sz == 0) {
taosArrayDestroy(pRes->uidList);
return 0;
}
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
blockDataEnsureCapacity(pDelBlock, sz);
pDelBlock->info.rows = sz;
pDelBlock->info.version = ver;
for (int32_t i = 0; i < sz; i++) {
// start key column
SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pStartCol, i, (const char*)&pRes->skey, false); // end key column
SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
colDataAppend(pEndCol, i, (const char*)&pRes->ekey, false);
// uid column
SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
int64_t* pUid = taosArrayGet(pRes->uidList, i);
colDataAppend(pUidCol, i, (const char*)pUid, false);
colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
colDataAppendNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
}
taosArrayDestroy(pRes->uidList);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
SSDataBlock block = {0};
assignOneDataBlock(&block, pDelBlock);
block.info.type = STREAM_DELETE_DATA;
taosArrayPush(pStreamBlock->blocks, &block);
if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId);
continue;
}
if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId);
continue;
}
} else {
streamTaskInputFail(pTask);
}
}
blockDataDestroy(pDelBlock);
return 0;
}
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
void* pIter = NULL;
bool failed = false;
SStreamDataSubmit* pSubmit = NULL;
......
......@@ -213,20 +213,25 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
return -1;
if (vnodeIsRoleLeader(pTq->pVnode)) {
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
return -1;
}
memcpy(data, msg, msgLen);
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
tqProcessSubmitReq(pTq, data, ver);
}
if (msgType == TDMT_VND_DELETE) {
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
}
memcpy(data, msg, msgLen);
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
tqProcessStreamTrigger(pTq, data, ver);
}
return 0;
......
......@@ -252,8 +252,8 @@ SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
// SMergeTree =================================================
static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
SLDataIter *pIter1 = (SLDataIter *)(p1 - sizeof(SRBTreeNode));
SLDataIter *pIter2 = (SLDataIter *)(p2 - sizeof(SRBTreeNode));
SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - sizeof(SRBTreeNode));
SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - sizeof(SRBTreeNode));
TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
......@@ -285,7 +285,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
struct SLDataIter *pIterList[TSDB_DEFAULT_SST_FILE] = {0};
struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
if (code != TSDB_CODE_SUCCESS) {
......@@ -323,7 +323,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
// compare with min in RB Tree
pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
if (pMTree->pIter && pIter) {
int32_t c = pMTree->rbt.cmprFn(pMTree->pIter->node.payload, &pIter->node.payload);
int32_t c = pMTree->rbt.cmprFn(RBTREE_NODE_PAYLOAD(&pMTree->pIter->node), RBTREE_NODE_PAYLOAD(&pIter->node));
if (c > 0) {
tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
pMTree->pIter = NULL;
......
......@@ -353,12 +353,13 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
int32_t step = asc ? 1 : -1;
pIter->index += step;
pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
return false;
}
pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
// check file the time range of coverage
STimeWindow win = {0};
......@@ -2161,8 +2162,6 @@ _err:
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};
initMemDataIterator(pScanInfo, pReader);
TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
if (pRow != NULL) {
key = TSDBROW_KEY(pRow);
......@@ -2356,6 +2355,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pBlock = getCurrentBlock(pBlockIter);
}
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
TSDBKEY key = getCurrentKeyInBuf(pScanInfo, pReader);
if (pBlockInfo == NULL) { // build data block from last data file
......
......@@ -14,6 +14,7 @@
*/
#include "vnd.h"
#include "tutil.h"
const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
.dbname = "",
......@@ -110,6 +111,8 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
......@@ -214,6 +217,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashPrefix", pCfg->hashPrefix, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashSuffix", pCfg->hashSuffix, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum, code);
if (code < 0) return -1;
......@@ -250,7 +257,8 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
switch (pVnode->config.hashMethod) {
default:
hashValue = MurmurHash3_32(tableFName, strlen(tableFName));
hashValue = taosGetTbHashVal(tableFName, strlen(tableFName), pVnode->config.hashMethod, pVnode->config.hashPrefix,
pVnode->config.hashSuffix);
break;
}
......
......@@ -371,7 +371,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
if (NULL == pMetaRsp) {
return;
}
strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
pMetaRsp->dbId = pVnode->config.dbId;
pMetaRsp->vgId = TD_VID(pVnode);
......@@ -527,7 +527,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
cRsp.code = TSDB_CODE_SUCCESS;
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
taosArrayPush(tbUids, &pCreateReq->uid);
vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
}
taosArrayPush(rsp.pArray, &cRsp);
......@@ -1107,6 +1107,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
tDecoderInit(pCoder, pReq, len);
tDecodeDeleteRes(pCoder, pRes);
ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
......
......@@ -38,11 +38,11 @@ extern "C" {
#include "tlockfree.h"
#include "tmsg.h"
#include "tpagedbuf.h"
#include "tstreamUpdate.h"
#include "tstream.h"
#include "tstreamUpdate.h"
#include "vnode.h"
#include "executorInt.h"
#include "vnode.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
......@@ -139,23 +139,23 @@ enum {
};
typedef struct {
//TODO remove prepareStatus
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned;
int64_t snapshotVer;
SSchemaWrapper *schema;
char tbName[TSDB_TABLE_NAME_LEN];
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
int64_t lastScanUid;
int8_t recoverStep;
// TODO remove prepareStatus
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned;
int64_t snapshotVer;
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
int64_t lastScanUid;
int8_t recoverStep;
SQueryTableDataCond tableCond;
int64_t recoverStartVer;
int64_t recoverEndVer;
SStreamState* pState;
int64_t recoverStartVer;
int64_t recoverEndVer;
SStreamState* pState;
} SStreamTaskInfo;
typedef struct {
......@@ -167,29 +167,29 @@ typedef struct {
} SSchemaInfo;
typedef struct SExecTaskInfo {
STaskIdInfo id;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
int64_t version; // used for stream to record wal version
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
STableListInfo tableqinfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SSubplan* pSubplan;
STaskIdInfo id;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
int64_t version; // used for stream to record wal version
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
STableListInfo tableqinfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
enum {
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
OP_EXEC_DONE = 0x9,
};
typedef struct SOperatorFpSet {
......@@ -228,7 +228,7 @@ typedef struct SOperatorInfo {
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
......@@ -251,26 +251,26 @@ typedef struct SLoadRemoteDataInfo {
} SLoadRemoteDataInfo;
typedef struct SLimitInfo {
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
} SLimitInfo;
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray* pResultBlockList;
int32_t rspBlockIndex; // indicate the return block index in pResultBlockList
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t rspBlockIndex; // indicate the return block index in pResultBlockList
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
SLoadRemoteDataInfo loadInfo;
uint64_t self;
......@@ -278,22 +278,22 @@ typedef struct SExchangeInfo {
} SExchangeInfo;
typedef struct SColMatchInfo {
int32_t srcSlotId; // source slot id
int32_t srcSlotId; // source slot id
int32_t colId;
int32_t targetSlotId;
bool output; // todo remove this?
bool output; // todo remove this?
bool reserved;
int32_t matchType; // determinate the source according to col id or slot id
int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo;
typedef struct SScanInfo {
int32_t numOfAsc;
int32_t numOfDesc;
int32_t numOfAsc;
int32_t numOfDesc;
} SScanInfo;
typedef struct SSampleExecInfo {
double sampleRatio; // data block sample ratio, 1 by default
uint32_t seed; // random seed value
double sampleRatio; // data block sample ratio, 1 by default
uint32_t seed; // random seed value
} SSampleExecInfo;
enum {
......@@ -305,39 +305,40 @@ typedef struct SAggSupporter {
SSHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // current write page id
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // current write page id
} SAggSupporter;
typedef struct {
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SInterval interval;
SAggSupporter *pAggSup;
SExprSupp *pExprSup; // expr supporter of aggregate operator
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
// current data block needs to be loaded.
SInterval interval;
SAggSupporter* pAggSup;
SExprSupp* pExprSup; // expr supporter of aggregate operator
} SAggOptrPushDownInfo;
typedef struct STableScanInfo {
STsdbReader* dataReader;
SReadHandle readHandle;
STsdbReader* dataReader;
SReadHandle readHandle;
SFileBlockLoadRecorder readRecorder;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t noTable;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t noTable;
SAggOptrPushDownInfo pdInfo;
int8_t assignBlockUid;
int8_t assignBlockUid;
} STableScanInfo;
typedef struct STableMergeScanInfo {
......@@ -370,7 +371,7 @@ typedef struct STableMergeScanInfo {
SArray* pColMatchInfo;
int32_t numOfOutput;
SExprSupp pseudoSup;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
......@@ -384,26 +385,26 @@ typedef struct STableMergeScanInfo {
} STableMergeScanInfo;
typedef struct STagScanInfo {
SColumnInfo *pCols;
SSDataBlock *pRes;
SArray *pColMatchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo *pTableList;
SColumnInfo* pCols;
SSDataBlock* pRes;
SArray* pColMatchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableList;
} STagScanInfo;
typedef struct SLastrowScanInfo {
SSDataBlock *pRes;
SReadHandle readHandle;
void *pLastrowReader;
SArray *pColMatchInfo;
int32_t *pSlotIds;
SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
SSDataBlock *pBufferredRes;
SArray *pUidList;
int32_t indexOfBufferedRes;
SSDataBlock* pRes;
SReadHandle readHandle;
void* pLastrowReader;
SArray* pColMatchInfo;
int32_t* pSlotIds;
SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
SSDataBlock* pBufferredRes;
SArray* pUidList;
int32_t indexOfBufferedRes;
} SLastrowScanInfo;
typedef enum EStreamScanMode {
......@@ -432,10 +433,10 @@ typedef struct SStreamAggSupporter {
SArray* pCurWins;
int32_t valueSize;
int32_t keySize;
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // buffer page that is active
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // buffer page that is active
SSDataBlock* pScanBlock;
} SStreamAggSupporter;
......@@ -447,22 +448,22 @@ typedef struct SWindowSupporter {
} SWindowSupporter;
typedef struct SPartitionBySupporter {
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
bool needCalc; // partition by column
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
bool needCalc; // partition by column
} SPartitionBySupporter;
typedef struct SPartitionDataInfo {
uint64_t groupId;
SArray* rowIds;
uint64_t groupId;
SArray* rowIds;
} SPartitionDataInfo;
typedef struct STimeWindowSupp {
int8_t calTrigger;
int64_t waterMark;
TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
int8_t calTrigger;
int64_t waterMark;
TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct SStreamScanInfo {
......@@ -487,22 +488,22 @@ typedef struct SStreamScanInfo {
uint64_t groupId;
SUpdateInfo* pUpdateInfo;
EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pTableScanOp;
SArray* childIds;
SWindowSupporter windowSup;
SPartitionBySupporter partitionSup;
SExprSupp* pPartScalarSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
STimeWindow updateWin;
STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pTableScanOp;
SArray* childIds;
SWindowSupporter windowSup;
SPartitionBySupporter partitionSup;
SExprSupp* pPartScalarSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
STimeWindow updateWin;
STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
// status for tmq
SNodeList* pGroupTags;
SNode* pTagCond;
......@@ -571,17 +572,17 @@ typedef struct SIntervalAggOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool ignoreExpiredData;
SArray* pRecycledPages;
SArray* pDelWins; // SWinRes
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
SNode* pCondition;
} SIntervalAggOperatorInfo;
typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo *intervalAggOperatorInfo;
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
bool hasGroupId;
uint64_t groupId; // current groupId
......@@ -592,63 +593,63 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
typedef struct SStreamFinalIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
int32_t order; // current SSDataBlock scan order
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SArray* pChildren;
SSDataBlock* pUpdateRes;
bool returnUpdate;
SPhysiNode* pPhyNode; // create new child
SPhysiNode* pPhyNode; // create new child
bool isFinal;
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool ignoreExpiredData;
SArray* pRecycledPages;
SArray* pDelWins; // SWinRes
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo *current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
SNode *pCondition;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
SNode* pCondition;
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SNode* pFilterNode; // filter info, which is push down by optimizer
SArray* pPseudoColInfo;
SLimitInfo limitInfo;
bool mergeDataBlocks;
SSDataBlock* pFinalRes;
SNode* pCondition;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SNode* pFilterNode; // filter info, which is push down by optimizer
SArray* pPseudoColInfo;
SLimitInfo limitInfo;
bool mergeDataBlocks;
SSDataBlock* pFinalRes;
SNode* pCondition;
} SProjectOperatorInfo;
typedef struct SIndefOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SExprSupp scalarSup;
SNode* pCondition;
uint64_t groupId;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SExprSupp scalarSup;
SNode* pCondition;
uint64_t groupId;
SSDataBlock* pNextGroupRes;
SSDataBlock* pNextGroupRes;
} SIndefOperatorInfo;
typedef struct SFillOperatorInfo {
......@@ -663,7 +664,7 @@ typedef struct SFillOperatorInfo {
SArray* pColMatchColInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
uint64_t curGroupId; // current handled group id
uint64_t curGroupId; // current handled group id
SExprInfo* pExprInfo;
int32_t numOfExpr;
SExprInfo* pNotFillExprInfo;
......@@ -672,23 +673,23 @@ typedef struct SFillOperatorInfo {
typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprSupp scalarSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprSupp scalarSup;
} SGroupbyOperatorInfo;
typedef struct SDataGroupInfo {
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
} SDataGroupInfo;
// The sort in partition may be needed later.
......@@ -700,12 +701,12 @@ typedef struct SPartitionOperatorInfo {
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group
SExprSupp scalarSup;
} SPartitionOperatorInfo;
......@@ -719,52 +720,52 @@ typedef struct SWindowRowsSup {
typedef struct SSessionAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
const SNode* pCondition;
const SNode* pCondition;
} SSessionAggOperatorInfo;
typedef struct SResultWindowInfo {
SResultRowPosition pos;
STimeWindow win;
uint64_t groupId;
bool isOutput;
bool isClosed;
STimeWindow win;
uint64_t groupId;
bool isOutput;
bool isClosed;
} SResultWindowInfo;
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
SStateKeys stateKey;
SStateKeys stateKey;
} SStateWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
bool returnUpdate;
SHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
bool returnUpdate;
SHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
......@@ -778,26 +779,26 @@ typedef struct SStreamPartitionOperatorInfo {
} SStreamPartitionOperatorInfo;
typedef struct STimeSliceOperatorInfo {
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pNextRow; // SArray<SGroupValue>
SArray* pLinearInfo; // SArray<SFillLinearInfo>
bool fillLastPoint;
bool isPrevRowSet;
bool isNextRowSet;
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pNextRow; // SArray<SGroupValue>
SArray* pLinearInfo; // SArray<SFillLinearInfo>
bool fillLastPoint;
bool isPrevRowSet;
bool isNextRowSet;
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
......@@ -811,52 +812,52 @@ typedef struct SStateWindowOperatorInfo {
} SStateWindowOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SColumn stateCol;
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SColumn stateCol;
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
} SStreamStateAggOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t resultRowFactor;
bool hasGroupVal;
SDiskbasedBuf *pTupleStore; // keep the final results
int32_t numOfResPerPage;
char** groupVal;
SArray *groupInfo;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle* pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t resultRowFactor;
bool hasGroupVal;
SDiskbasedBuf* pTupleStore; // keep the final results
int32_t numOfResPerPage;
char** groupVal;
SArray* groupInfo;
} SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output
int32_t bufPageSize;
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
SNode* pCondition;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SArray* pColMatchInfo; // for index map from table scan output
int32_t bufPageSize;
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
SNode* pCondition;
} SSortOperatorInfo;
typedef struct STagFilterOperatorInfo {
......@@ -864,18 +865,18 @@ typedef struct STagFilterOperatorInfo {
} STagFilterOperatorInfo;
typedef struct SJoinOperatorInfo {
SSDataBlock *pRes;
int32_t joinType;
int32_t inputOrder;
SSDataBlock *pLeft;
int32_t leftPos;
SColumnInfo leftCol;
SSDataBlock *pRight;
int32_t rightPos;
SColumnInfo rightCol;
SNode *pCondAfterMerge;
SSDataBlock* pRes;
int32_t joinType;
int32_t inputOrder;
SSDataBlock* pLeft;
int32_t leftPos;
SColumnInfo leftCol;
SSDataBlock* pRight;
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
} SJoinOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
......@@ -884,8 +885,8 @@ typedef struct SJoinOperatorInfo {
void doDestroyExchangeOperatorInfo(void* param);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
......@@ -896,24 +897,26 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList,
char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
......@@ -921,10 +924,10 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pC
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock, const char* idStr);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts);
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts);
......@@ -933,29 +936,37 @@ SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams,
SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
......@@ -975,33 +986,39 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
int32_t scanFlag, bool createDummyCol);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec();
......@@ -1013,7 +1030,7 @@ int32_t getMaximumIdleDurationSec();
* nOptrWithVal: *nOptrWithVal save the number of optr with value
* return: result code, 0 means success
*/
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length, int32_t *nOptrWithVal);
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t* length, int32_t* nOptrWithVal);
/*
* ops: root operator, created by caller
......@@ -1026,7 +1043,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
......@@ -1035,42 +1052,43 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
const int32_t* rowCellOffset, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(SScanPhysiNode *pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idstr);
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idstr);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
bool groupbyTbname(SNodeList* pGroupList);
bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createSpecialDataBlock(EStreamType type);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
#ifdef __cplusplus
}
......
......@@ -52,7 +52,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// TODO: if a block was set but not consumed,
// prevent setting a different type of block
pInfo->validBlockIndex = 0;
taosArrayClear(pInfo->pBlockLists);
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);
} else {
taosArrayClear(pInfo->pBlockLists);
}
if (type == STREAM_INPUT__MERGED_SUBMIT) {
// ASSERT(numOfBlocks > 1);
......
......@@ -956,9 +956,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo->currentGroupId = -1;
}
static void freeArray(void* array) {
taosArrayDestroy(array);
}
static void freeArray(void* array) { taosArrayDestroy(array); }
static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
STableScanInfo* pTableScanInfo = pTableScanOp->info;
......@@ -972,15 +970,16 @@ static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
resetTableScanInfo(pTableScanOp->info, &win);
}
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, int64_t maxVersion) {
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
int64_t maxVersion) {
SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
taosArrayClear(gpTbls);
STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo));
SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(tbls, &tblInfo);
taosArrayPush(gpTbls, &tbls);
STimeWindow win = {.skey = startTs, .ekey = endTs};
STimeWindow win = {.skey = startTs, .ekey = endTs};
STableScanInfo* pTableScanInfo = pTableScanOp->info;
pTableScanInfo->cond.startVersion = -1;
pTableScanInfo->cond.endVersion = maxVersion;
......@@ -1069,8 +1068,8 @@ static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlo
if (hasGroup) {
(*pRowIndex) += 1;
} else {
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC);
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
}
do {
preWin = endWin;
......@@ -1111,8 +1110,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
char* pSrcData = colDataGetData(pSrcCol, i);
bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
char* pSrcData = colDataGetData(pSrcCol, i);
colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull);
}
pResult->info.rows++;
......@@ -1153,7 +1152,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
int32_t dummy = 0;
int64_t version = pSrcBlock->info.version - 1;
int64_t version = pSrcBlock->info.version - 1;
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
// gap must be 0.
......@@ -1202,15 +1201,15 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
int64_t version = pSrcBlock->info.version - 1;
int64_t version = pSrcBlock->info.version - 1;
for (int32_t i = 0; i < rows;) {
uint64_t srcUid = srcUidData[i];
uint64_t groupId = getGroupIdByData(pInfo, srcUid, tsCol[i], version);
uint64_t srcGpId = srcGp[i];
TSKEY calStartTs = tsCol[i];
TSKEY calStartTs = tsCol[i];
colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i, pInfo->partitionSup.needCalc);
TSKEY calEndTs = tsCol[i - 1];
TSKEY calEndTs = tsCol[i - 1];
colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
......@@ -1278,10 +1277,10 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->windowSup.pIntervalAggSup);
if ((update || closedWin) && out) {
uint64_t gpId = closedWin&&pInfo->partitionSup.needCalc ?
calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId) : 0;
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid,
&gpId);
uint64_t gpId = closedWin && pInfo->partitionSup.needCalc
? calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId)
: 0;
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId);
}
}
if (out && pInfo->pUpdateDataRes->info.rows > 0) {
......@@ -1516,6 +1515,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
return pInfo->pDeleteDataRes;
} break;
......@@ -1916,7 +1916,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
ASSERT(pHandle->tqReader);
pInfo->tqReader = pHandle->tqReader;
}
pInfo->pUpdateInfo = NULL;
pInfo->pTableScanOp = pTableScanOp;
pInfo->interval = pTSInfo->pdInfo.interval;
......@@ -1948,8 +1948,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup =
(SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0;
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->pStreamScanOp = pOperator;
......
......@@ -35,14 +35,15 @@ typedef struct SPullWindowInfo {
typedef struct SOpenWindowInfo {
SResultRowPosition pos;
uint64_t groupId;
uint64_t groupId;
} SOpenWindowInfo;
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
uint64_t groupId);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
///*
......@@ -601,16 +602,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs;
int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs;
SResultRow* pResult = NULL;
while (1) {
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data;
uint64_t groupId = pOpenWin->groupId;
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
SOpenWindowInfo* pOpenWin = (SOpenWindowInfo*)pn->data;
uint64_t groupId = pOpenWin->groupId;
SResultRowPosition* p1 = &pOpenWin->pos;
if (p->pageId == p1->pageId && p->offset == p1->offset) {
break;
......@@ -1031,7 +1031,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
// window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
//TODO: add to open window? how to close the open windows after input blocks exhausted?
// TODO: add to open window? how to close the open windows after input blocks exhausted?
#if 0
if ((ascScan && ekey <= pBlock->info.window.ekey) ||
(!ascScan && ekey >= pBlock->info.window.skey)) {
......@@ -1065,13 +1065,13 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe
openWin.pos.pageId = pResult->pageId;
openWin.pos.offset = pResult->offset;
openWin.groupId = groupId;
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
if (pn == NULL) {
tdListAppend(pResultRowInfo->openWindow, &openWin);
return openWin.pos;
}
SOpenWindowInfo * px = (SOpenWindowInfo *)pn->data;
SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
tdListAppend(pResultRowInfo->openWindow, &openWin);
}
......@@ -1427,7 +1427,8 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
return true;
}
void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval, SHashObj* pUpdatedMap) {
void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval,
SHashObj* pUpdatedMap) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
......@@ -1472,7 +1473,6 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
}
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
......@@ -1833,7 +1833,8 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
return needed;
}
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, int64_t waterMark) {
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval,
int64_t waterMark) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, waterMark);
return;
......@@ -1928,7 +1929,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark);
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval,
pInfo->twAggSup.waterMark);
}
code = appendDownstream(pOperator, &downstream, 1);
......@@ -2255,7 +2257,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
if (hasInterp) {
pResBlock->info.rows += 1;
}
}
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
......@@ -2728,7 +2729,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
goto _error;
}
int32_t num = 0;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
......@@ -2755,7 +2756,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};;
pInfo->twAggSup =
(STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
;
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId;
......@@ -2864,8 +2867,8 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
}
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pDestCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
SColumnInfoData idata = {0};
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
SColumnInfoData idata = {0};
idata.info.type = TSDB_DATA_TYPE_BIGINT;
idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
idata.pData = p;
......@@ -2874,7 +2877,7 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
pDestCtx[k].sfp.process(&tw, 1, &out);
pEntryInfo->numOfRes = 1;
}else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
} else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
......@@ -2900,7 +2903,8 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
}
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray,
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pUpdatedMap) {
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo,
SHashObj* pUpdatedMap) {
int32_t size = taosArrayGetSize(pWinArray);
if (!pInfo->pChildren) {
return;
......@@ -2941,7 +2945,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL;
}
......@@ -3066,7 +3070,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo)
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->aggSup.currentPageId = -1;
pInfo->aggSup.currentPageId = -1;
}
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
......@@ -3152,7 +3156,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(wins);
for (int32_t i = 0; i < size; i++) {
SWinKey* winKey = taosArrayGet(wins, i);
SWinKey* winKey = taosArrayGet(wins, i);
STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval);
if (isCloseWindow(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
......@@ -3168,6 +3172,12 @@ static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pIn
}
}
static void clearFunctionContext(SExprSupp* pSup) {
for (int32_t i = 0; i < pSup->numOfExprs; i++) {
pSup->pCtx[i].saveHandle.currentPage = -1;
}
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
......@@ -3203,6 +3213,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (!IS_FINAL_OP(pInfo)) {
clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo);
qDebug("===stream===clear semi operator");
......@@ -3281,8 +3292,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval, NULL);
rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId,
pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdatedMap);
rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs,
pOperator->pTaskInfo, pUpdatedMap);
addRetriveWindow(delWins, pInfo);
taosArrayAddAll(pInfo->pDelWins, delWins);
taosArrayDestroy(delWins);
......@@ -3388,40 +3399,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL;
}
SSDataBlock* createSpecialDataBlock(EStreamType type) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = type;
pBlock->info.rowSize =
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
pBlock->info.watermark = INT64_MIN;
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// window start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// window end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
// uid
taosArrayPush(pBlock->pDataBlock, &infoData);
// group id
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
// calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -3716,7 +3693,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex);
initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex);
code = appendDownstream(pOperator, &downstream, 1);
}
return pOperator;
......@@ -3747,12 +3725,14 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) { return isInTimeWindow(&pWinInfo->win, ts, gap); }
static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY startTs, TSKEY endTs, int32_t index) {
SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false};
SResultWindowInfo win = {
.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false};
return taosArrayInsert(pWinInfos, index, &win);
}
static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY startTs, TSKEY endTs) {
SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false};
SResultWindowInfo win = {
.pos.offset = -1, .pos.pageId = -1, .win.skey = startTs, .win.ekey = endTs, .isOutput = false};
return taosArrayPush(pWinInfos, &win);
}
......@@ -4154,8 +4134,8 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
}
}
static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray,
int32_t numOfOutput, SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) {
static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray, int32_t numOfOutput,
SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -4168,7 +4148,8 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
uint64_t groupId = pParentWin->groupId;
int32_t winIndex = 0;
if (needCreate) {
pParentWin = getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex);
pParentWin =
getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex);
}
setWindowOutputBuf(pParentWin, &pCurResult, pSup->pCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset,
&pInfo->streamAggSup, pTaskInfo);
......@@ -4299,9 +4280,9 @@ static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {
}
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index);
SWinKey* pData = (SWinKey*)pKey;
SWinKey* pData = (SWinKey*)pKey;
if (pData->ts == *(int64_t*)pos->key) {
if (pData->groupId > pos->groupId) {
return 1;
......@@ -4324,7 +4305,7 @@ static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) {
int32_t num = taosArrayGetSize(update);
for (int32_t i = 0; i < num; i++) {
SResKeyPos* pos = taosArrayGetP(update, i);
SWinKey winKey = {.ts = *(int64_t*)pos->key, .groupId = pos->groupId};
SWinKey winKey = {.ts = *(int64_t*)pos->key, .groupId = pos->groupId};
taosHashRemove(pStDeleted, &winKey, sizeof(SWinKey));
}
}
......@@ -4353,7 +4334,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); // SResKeyPos
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); // SResKeyPos
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
......@@ -4369,8 +4350,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs,
0, NULL);
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX,
pChildOp->exprSupp.numOfExprs, 0, NULL);
rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, NULL, false);
}
taosArrayDestroy(pWins);
......@@ -4471,7 +4452,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
{
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows > 0) {
......@@ -4493,6 +4474,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
if (pOperator->status == OP_RES_TO_RETURN) {
clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer
clearStreamSessionOperator(pInfo);
pOperator->status = OP_EXEC_DONE;
......@@ -4863,8 +4845,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
i, &allEqual, pStDeleted);
if (!allEqual) {
uint64_t uid = 0;
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&uid, &groupId);
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, &uid, &groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue;
......@@ -4893,7 +4874,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t maxTs = INT64_MIN;
int64_t maxTs = INT64_MIN;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
......@@ -5042,7 +5023,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
destroyStreamStateOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex);
initDownStream(downstream, &pInfo->streamAggSup, 0, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -5071,8 +5053,8 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
SExprSupp* pSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
......@@ -5281,11 +5263,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
SExprSupp* pSup = &pOperator->exprSupp;
miaInfo->pCondition = pNode->window.node.pConditions;
miaInfo->curTs = INT64_MIN;
iaInfo->win = pTaskInfo->window;
iaInfo->inputOrder = TSDB_ORDER_ASC;
iaInfo->interval = interval;
iaInfo->execModel = pTaskInfo->execModel;
miaInfo->curTs = INT64_MIN;
iaInfo->win = pTaskInfo->window;
iaInfo->inputOrder = TSDB_ORDER_ASC;
iaInfo->interval = interval;
iaInfo->execModel = pTaskInfo->execModel;
iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
......@@ -5308,12 +5290,12 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
pOperator->name = "TimeMergeAlignedIntervalAggOperator";
pOperator->name = "TimeMergeAlignedIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = miaInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = miaInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL);
......@@ -5366,8 +5348,8 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
......@@ -5592,10 +5574,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
pIntervalInfo->win = pTaskInfo->window;
pIntervalInfo->win = pTaskInfo->window;
pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
pIntervalInfo->interval = interval;
pIntervalInfo->execModel = pTaskInfo->execModel;
pIntervalInfo->interval = interval;
pIntervalInfo->execModel = pTaskInfo->execModel;
pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
......@@ -5612,7 +5594,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
if (pIntervalInfo->timeWindowInterpo) {
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
......@@ -5623,12 +5604,12 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
pOperator->name = "TimeMergeIntervalAggOperator";
pOperator->name = "TimeMergeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pMergeIntervalInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pMergeIntervalInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
......
......@@ -76,11 +76,11 @@ typedef struct STopBotResItem {
} STopBotResItem;
typedef struct STopBotRes {
int32_t maxSize;
int16_t type;
int32_t maxSize;
int16_t type;
STuplePos nullTuplePos;
bool nullTupleSaved;
STuplePos nullTuplePos;
bool nullTupleSaved;
STopBotResItem* pItems;
} STopBotRes;
......@@ -223,14 +223,14 @@ typedef struct SMavgInfo {
} SMavgInfo;
typedef struct SSampleInfo {
int32_t samples;
int32_t totalPoints;
int32_t numSampled;
uint8_t colType;
int16_t colBytes;
int32_t samples;
int32_t totalPoints;
int32_t numSampled;
uint8_t colType;
int16_t colBytes;
STuplePos nullTuplePos;
bool nullTupleSaved;
STuplePos nullTuplePos;
bool nullTupleSaved;
char* data;
STuplePos* tuplePos;
......@@ -1147,7 +1147,7 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
}
static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock);
static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) {
......@@ -1357,8 +1357,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
numOfElems += 1;
}
} else if (type == TSDB_DATA_TYPE_BIGINT ||
type == TSDB_DATA_TYPE_TIMESTAMP) {
} else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t* pData = (int64_t*)pCol->pData;
int64_t* val = (int64_t*)&pBuf->v;
......@@ -1581,7 +1580,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
_min_max_over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) {
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pBuf->nullTupleSaved = true;
}
......@@ -1601,7 +1600,8 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
}
static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex);
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
int32_t rowIndex);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
......@@ -1651,7 +1651,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
if (pCtx->saveHandle.pBuf != NULL) {
if (pTuplePos->pageId != -1) {
int32_t numOfCols = pCtx->subsidiaries.num;
int32_t numOfCols = pCtx->subsidiaries.num;
const char* p = loadTupleData(pCtx, pTuplePos);
bool* nullList = (bool*)p;
......@@ -1660,7 +1660,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
// todo set the offset value to optimize the performance.
for (int32_t j = 0; j < numOfCols; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
......@@ -1701,7 +1701,7 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
char* pData = colDataGetData(pSrcCol, rowIndex);
// append to dest col
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
SColumnInfoData* pDstCol = taosArrayGet(pCtx->pDstBlock->pDataBlock, dstSlotId);
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
......@@ -1712,7 +1712,6 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
colDataAppend(pDstCol, pos, pData, false);
}
}
}
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
......@@ -2590,8 +2589,8 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems,
pHisto->numOfEntries, pHisto);
qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, pHisto->numOfEntries,
pHisto);
} else {
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
qDebug("%s input histogram, elem:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems,
......@@ -2601,8 +2600,8 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems,
pHisto->numOfEntries, pHisto);
qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, pHisto->numOfEntries,
pHisto);
tHistogramDestroy(&pRes);
}
}
......@@ -2629,8 +2628,8 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
}
if (pInfo->algo != APERCT_ALGO_TDIGEST) {
qDebug("%s after merge, total:%d, numOfEntry:%d, %p", __FUNCTION__, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries,
pInfo->pHisto);
qDebug("%s after merge, total:%d, numOfEntry:%d, %p", __FUNCTION__, pInfo->pHisto->numOfElems,
pInfo->pHisto->numOfEntries, pInfo->pHisto);
}
SET_VAL(pResInfo, 1, 1);
......@@ -2709,7 +2708,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
}
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*) pRes;
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes;
// not initialized yet, data is required
if (pEntry == NULL) {
......@@ -2752,7 +2751,8 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
}
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) {
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
SFirstLastRes* pInfo) {
if (pCtx->subsidiaries.num <= 0) {
return;
}
......@@ -3176,7 +3176,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
pDiffInfo->prev.i64 = *(bool*)pv? 1:0;
pDiffInfo->prev.i64 = *(bool*)pv ? 1 : 0;
break;
case TSDB_DATA_TYPE_TINYINT:
pDiffInfo->prev.i64 = *(int8_t*)pv;
......@@ -3537,7 +3537,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
* |(n columns, one bit for each column)| src column #1| src column #2|
* +------------------------------------+--------------+--------------+
*/
void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies, char* buf) {
void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies,
char* buf) {
char* nullList = buf;
char* pStart = (char*)(nullList + sizeof(bool) * pSubsidiaryies->num);
......@@ -3585,7 +3586,7 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
}
}
p = (STuplePos) {.pageId = pHandle->currentPage, .offset = pPage->num};
p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num};
memcpy(pPage->data + pPage->num, pBuf, length);
pPage->num += length;
......@@ -3621,7 +3622,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
} else {
}
return TSDB_CODE_SUCCESS;
......@@ -3636,7 +3636,7 @@ static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSD
static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) {
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
char* p = pPage->data + pPos->offset;
char* p = pPage->data + pPos->offset;
releaseBufPage(pHandle->pBuf, pPage);
return p;
} else {
......@@ -3980,8 +3980,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
}
if (pCtx->end.key == INT64_MIN) {
pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ?
ptsList[start + pInput->numOfRows - 1] : pInfo->min;
pInfo->min =
(pInfo->min > ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->min;
} else {
pInfo->min = pCtx->end.key;
}
......@@ -3993,8 +3993,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
}
if (pCtx->end.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ?
ptsList[start + pInput->numOfRows - 1] : pInfo->max;
pInfo->max =
(pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
} else {
pInfo->max = pCtx->end.key + 1;
}
......
......@@ -243,6 +243,36 @@ FAIL:
return 0;
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
int64_t groupId) {
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId);
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
// TODO: get hash function by hashMethod
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
taosMemoryFree(ctbName);
bool found = false;
// TODO: optimize search
int32_t j;
for (j = 0; j < vgSz; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
return -1;
}
if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
found = true;
break;
}
}
ASSERT(found);
return 0;
}
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(pData->blocks);
......@@ -317,20 +347,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
for (int32_t i = 0; i < blockNum; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pDataBlock->info.groupId);
// TODO: get hash function by hashMethod
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
taosMemoryFree(ctbName);
bool found = false;
// TODO: optimize search
int32_t j;
for (j = 0; j < vgSz; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
......@@ -338,11 +358,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
}
pReqs[j].blockNum++;
found = true;
break;
}
continue;
}
if (streamSearchAndAddBlock(pTask, pReqs, pDataBlock, vgSz, pDataBlock->info.groupId) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
ASSERT(found);
}
for (int32_t i = 0; i < vgSz; i++) {
......
......@@ -219,7 +219,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
while (temp != pTree->NIL) {
y = temp;
int32_t c = pTree->cmprFn(z->payload, temp->payload);
int32_t c = pTree->cmprFn(RBTREE_NODE_PAYLOAD(z), RBTREE_NODE_PAYLOAD(temp));
if (c < 0) {
temp = temp->left;
} else if (c > 0) {
......@@ -232,7 +232,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
if (y == pTree->NIL) {
pTree->root = z;
} else if (pTree->cmprFn(z->payload, y->payload) < 0) {
} else if (pTree->cmprFn(RBTREE_NODE_PAYLOAD(z), RBTREE_NODE_PAYLOAD(y)) < 0) {
y->left = z;
} else {
y->right = z;
......@@ -245,10 +245,10 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
tRBTreePutFix(pTree, z);
// update min/max node
if (pTree->min == pTree->NIL || pTree->cmprFn(pTree->min->payload, z->payload) > 0) {
if (pTree->min == pTree->NIL || pTree->cmprFn(RBTREE_NODE_PAYLOAD(pTree->min), RBTREE_NODE_PAYLOAD(z)) > 0) {
pTree->min = z;
}
if (pTree->max == pTree->NIL || pTree->cmprFn(pTree->max->payload, z->payload) < 0) {
if (pTree->max == pTree->NIL || pTree->cmprFn(RBTREE_NODE_PAYLOAD(pTree->max), RBTREE_NODE_PAYLOAD(z)) < 0) {
pTree->max = z;
}
pTree->n++;
......@@ -313,7 +313,7 @@ SRBTreeNode *tRBTreeGet(SRBTree *pTree, void *pKey) {
SRBTreeNode *pNode = pTree->root;
while (pNode != pTree->NIL) {
int32_t c = pTree->cmprFn(pKey, pNode->payload);
int32_t c = pTree->cmprFn(pKey, RBTREE_NODE_PAYLOAD(pNode));
if (c < 0) {
pNode = pNode->left;
......
......@@ -158,6 +158,8 @@ print =============== restart
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start -v
sleep 1000
sql select avg(tbcol) as c from stb
sql select avg(tbcol) as c from stb where ts <= 1601481840000
sql select avg(tbcol) as c from stb where tgcol < 5 and ts <= 1601481840000
......
#ADD_SUBDIRECTORY(examples/c)
ADD_SUBDIRECTORY(tsim)
ADD_SUBDIRECTORY(test/c)
#ADD_SUBDIRECTORY(comparisonTest/tdengine)
add_executable(tmq_demo tmqDemo.c)
add_dependencies(tmq_demo taos)
add_executable(tmq_sim tmqSim.c)
add_executable(create_table createTable.c)
add_executable(tmq_taosx_ci tmq_taosx_ci.c)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册