提交 d490ca75 编写于 作者: wmmhello's avatar wmmhello

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/TD-19042

......@@ -123,8 +123,8 @@ ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ENDIF ()
# disable all assert
......
......@@ -77,6 +77,12 @@ ELSEIF (TD_DARWIN_64)
ENDIF ()
ENDIF ()
option(
BUILD_GEOS
"If build geos on Windows"
OFF
)
option(
BUILD_SHARED_LIBS
""
......
......@@ -56,9 +56,17 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
SET(TD_DARWIN TRUE)
SET(OSTYPE "macOS")
execute_process(COMMAND geos-config --cflags OUTPUT_VARIABLE GEOS_CFLAGS)
execute_process(COMMAND geos-config --ldflags OUTPUT_VARIABLE GEOS_LDFLAGS)
string(SUBSTRING ${GEOS_CFLAGS} 2 -1 GEOS_CFLAGS)
string(REGEX REPLACE "\n" "" GEOS_CFLAGS ${GEOS_CFLAGS})
string(SUBSTRING ${GEOS_LDFLAGS} 2 -1 GEOS_LDFLAGS)
string(REGEX REPLACE "\n" "" GEOS_LDFLAGS ${GEOS_LDFLAGS})
MESSAGE("GEOS_CFLAGS "${GEOS_CFLAGS})
MESSAGE("GEOS_LDFLAGS "${GEOS_LDFLAGS})
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare")
INCLUDE_DIRECTORIES(/usr/local/include)
LINK_DIRECTORIES(/usr/local/lib)
INCLUDE_DIRECTORIES(${GEOS_CFLAGS})
LINK_DIRECTORIES(${GEOS_LDFLAGS})
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64")
MESSAGE("Current system arch is arm64")
......
......@@ -231,11 +231,16 @@ if(${BUILD_WITH_ROCKSDB})
if(${TD_LINUX})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result")
endif(${TD_LINUX})
MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS})
if(${TD_DARWIN})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
endif(${TD_DARWIN})
if (${TD_DARWIN_ARM64})
set(HAS_ARMV8_CRC true)
endif(${TD_DARWIN_ARM64})
if (${TD_WINDOWS})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4244 /wd4819")
endif(${TD_WINDOWS})
......@@ -248,7 +253,7 @@ if(${BUILD_WITH_ROCKSDB})
endif(${TD_DARWIN})
if(${TD_WINDOWS})
option(WITH_JNI "" ON)
option(WITH_JNI "" OFF)
endif(${TD_WINDOWS})
if(${TD_WINDOWS})
......@@ -260,7 +265,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_FALLOCATE "" OFF)
option(WITH_JEMALLOC "" OFF)
option(WITH_GFLAGS "" OFF)
option(PORTABLE "" ON)
option(PORTABLE "" OFF)
option(WITH_LIBURING "" OFF)
option(FAIL_ON_WARNINGS OFF)
......@@ -268,8 +273,11 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_BENCHMARK_TOOLS "" OFF)
option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF)
IF (TD_LINUX)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON)
ELSE()
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
ENDIF()
add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
target_include_directories(
rocksdb
......
......@@ -121,6 +121,8 @@ alter_database_option: {
| WAL_LEVEL value
| WAL_FSYNC_PERIOD value
| KEEP value
| WAL_RETENTION_PERIOD value
| WAL_RETENTION_SIZE value
}
```
......
......@@ -103,63 +103,6 @@ typedef struct SRowBuffPos {
bool beUsed;
} SRowBuffPos;
// int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
// int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
// SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
// int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
// SArray *pTableUids);
// void *tsdbCacherowsReaderClose(void *pReader);
// int32_t tsdbGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
// int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
// SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly, SHashObj** pIgnoreTables);
// int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
// void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
// void tsdbReaderClose(STsdbReader *pReader);
// int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
// int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
// void tsdbReleaseDataBlock(STsdbReader *pReader);
// SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
// int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
// int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
// int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
// void *tsdbGetIdx(void *pMeta);
// void *tsdbGetIvtIdx(void *pMeta);
// uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
// void tsdbReaderSetCloseFlag(STsdbReader *pReader);
// int64_t tsdbGetLastTimestamp(void* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr);
// int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
// int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
// int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
// void *vnodeGetIdx(void *pVnode);
// void *vnodeGetIvtIdx(void *pVnode);
// void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
// void metaReaderReleaseLock(SMetaReader *pReader);
// void metaReaderClear(SMetaReader *pReader);
// int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
// int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
// int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
// int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
// int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList);
// const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
// int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
//
// int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
// int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
// int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
// bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
// int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
// bool *acquired);
// int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
// int32_t payloadLen, double selectivityRatio);
// int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
// tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
// int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
// int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
// int32_t payloadLen);
// tq
typedef struct SMetaTableInfo {
int64_t suid;
......@@ -201,64 +144,39 @@ typedef struct {
// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
// int32_t destroySnapContext(SSnapContext *ctx);
// clang-format off
/*-------------------------------------------------new api format---------------------------------------------------*/
// typedef int32_t (*__store_reader_(STsdbReader *pReader, const void *pTableList, int32_t num);
// typedef void (*tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
// typedef void (*tsdbReaderClose(STsdbReader *pReader);
// typedef int32_t (*tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
// typedef int32_t (*tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
// typedef void (*tsdbReleaseDataBlock(STsdbReader *pReader);
// typedef SSDataBlock * (*tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
// typedef int32_t (*tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
// typedef int32_t (*tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
// typedef int64_t (*tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
// typedef void * (*tsdbGetIdx(void *pMeta);
// typedef void * (*tsdbGetIvtIdx(void *pMeta);
// typedef uint64_t (*tsdbGetReaderMaxVersion(STsdbReader *pReader);
// typedef void (*tsdbReaderSetCloseFlag(STsdbReader *pReader);
// typedef int64_t (*tsdbGetLastTimestamp(void* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr);
typedef int32_t (*__store_reader_open_fn_t)(void *pVnode, SQueryTableDataCond *pCond, void *pTableList,
int32_t numOfTables, SSDataBlock *pResBlock, void **ppReader,
const char *idstr, bool countOnly, SHashObj **pIgnoreTables);
typedef struct TsdReader {
__store_reader_open_fn_t tsdReaderOpen;
void (*tsdReaderClose)();
void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
int32_t (*tsdSetQueryTableList)();
int32_t (*tsdNextDataBlock)();
int32_t (*tsdReaderRetrieveBlockSMAInfo)();
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly,
SHashObj** pIgnoreTables);
void (*tsdReaderClose)();
void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
int32_t (*tsdSetQueryTableList)();
int32_t (*tsdNextDataBlock)();
int32_t (*tsdReaderRetrieveBlockSMAInfo)();
SSDataBlock *(*tsdReaderRetrieveDataBlock)();
void (*tsdReaderReleaseDataBlock)();
void (*tsdReaderReleaseDataBlock)();
int32_t (*tsdReaderResetStatus)();
int32_t (*tsdReaderGetDataBlockDistInfo)();
int64_t (*tsdReaderGetNumOfInMemRows)();
void (*tsdReaderNotifyClosing)();
int32_t (*tsdReaderResetStatus)();
int32_t (*tsdReaderGetDataBlockDistInfo)();
int64_t (*tsdReaderGetNumOfInMemRows)();
void (*tsdReaderNotifyClosing)();
} TsdReader;
/**
* int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);
*/
typedef struct SStoreCacheReader {
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
void *(*closeReader)(void *pReader);
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUidList);
int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
void *(*closeReader)(void *pReader);
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUidList);
int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
} SStoreCacheReader;
// clang-format on
/*------------------------------------------------------------------------------------------------------------------*/
/*
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
......@@ -309,16 +227,10 @@ typedef struct SStoreTqReader {
} SStoreTqReader;
typedef struct SStoreSnapshotFn {
/*
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
int32_t destroySnapContext(SSnapContext *ctx);
*/
int32_t (*createSnapshot)();
int32_t (*destroySnapshot)();
SMetaTableInfo (*getMetaTableInfoFromSnapshot)();
int32_t (*getTableInfoFromSnapshot)();
int32_t (*createSnapshot)(SSnapContext *ctx, int64_t uid);
int32_t (*destroySnapshot)(SSnapContext *ctx);
SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx);
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
} SStoreSnapshotFn;
/**
......@@ -327,12 +239,10 @@ void metaReaderReleaseLock(SMetaReader *pReader);
void metaReaderClear(SMetaReader *pReader);
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
......@@ -340,51 +250,40 @@ int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *k
bool *acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen);
*/
typedef struct SStoreMeta {
SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor
int32_t (*cursorNext)(); // metaTbCursorNext
int32_t (*cursorPrev)(); // metaTbCursorPrev
SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor
int32_t (*cursorNext)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorNext
int32_t (*cursorPrev)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorPrev
int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList);
int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList);
int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList);
int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList);
const void *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it
int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid);
int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType);
int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName);
bool (*isTableExisted)(void *pVnode, tb_uid_t uid);
int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid);
int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType);
int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName);
bool (*isTableExisted)(void *pVnode, tb_uid_t uid);
/**
* int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen);
*/
void (*getCachedTableList)();
void (*putTableListIntoCache)();
int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen);
/**
*
*/
void *(*storeGetIndexInfo)();
void *(*getInvertIndex)(void* pVnode);
int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
void *storeGetVersionRange;
void *storeGetLastTimestamp;
int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema
int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes);
int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio);
void *(*storeGetIndexInfo)();
void *(*getInvertIndex)(void* pVnode);
int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
void *storeGetVersionRange;
void *storeGetLastTimestamp;
int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema
// db name, vgId, numOfTables, numOfSTables
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
......
......@@ -21,8 +21,8 @@ extern "C" {
#endif
#include "tcommon.h"
#include "tvariant.h"
#include "tsimplehash.h"
#include "tvariant.h"
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
......@@ -77,7 +77,7 @@ enum {
enum {
MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, // todo remove it
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
};
typedef struct SPoint1 {
......@@ -130,43 +130,44 @@ typedef struct SSerializeDataHandle {
// incremental state storage
typedef struct STdbState {
void* rocksdb;
void** pHandle;
void* writeOpts;
void* readOpts;
void** cfOpts;
void* dbOpt;
struct SStreamTask* pOwner;
void* param;
void* env;
SListNode* pComparNode;
void* pBackendHandle;
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
struct SStreamTask *pOwner;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
char idstr[64];
void* compactFactory;
void* db;
void* pStateDb;
void* pFuncStateDb;
void* pFillStateDb; // todo refactor
void* pSessionStateDb;
void* pParNameDb;
void* pParTagDb;
void* txn;
void *compactFactory;
TdThreadRwlock rwLock;
void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
} STdbState;
typedef struct {
STdbState* pTdbState;
struct SStreamFileState* pFileState;
int32_t number;
SSHashObj* parNameMap;
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
STdbState *pTdbState;
struct SStreamFileState *pFileState;
int32_t number;
SSHashObj *parNameMap;
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
} SStreamState;
typedef struct SFunctionStateStore {
int32_t (*streamStateFuncPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateFuncGet)(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen);
} SFunctionStateStore;
// sql function runtime context
......@@ -180,7 +181,7 @@ typedef struct SqlFunctionCtx {
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param;
SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput;
int32_t numOfParams;
......
......@@ -34,7 +34,24 @@ extern "C" {
// SListNode* streamBackendAddCompare(void* backend, void* arg);
// void streamBackendDelCompare(void* backend, void* arg);
//typedef struct STdbState {
// <<<<<<< HEAD
// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
// rocksdb_readoptions_t* readOpts;
// rocksdb_options_t** cfOpts;
// rocksdb_options_t* dbOpt;
// struct SStreamTask* pOwner;
// void* param;
// void* env;
// SListNode* pComparNode;
// void* pBackend;
// char idstr[64];
// void* compactFactory;
// TdThreadRwlock rwLock;
// =======
// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
......@@ -58,6 +75,7 @@ extern "C" {
// TTB* pParTagDb;
// TXN* txn;
//} STdbState;
//>>>>>>> enh/dev3.0
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState, bool remove);
......
......@@ -78,11 +78,11 @@ enum {
TASK_TRIGGER_STATUS__ACTIVE,
};
enum {
typedef enum {
TASK_LEVEL__SOURCE = 1,
TASK_LEVEL__AGG,
TASK_LEVEL__SINK,
};
} ETASK_LEVEL;
enum {
TASK_OUTPUT__FIXED_DISPATCH = 1,
......@@ -284,13 +284,13 @@ struct SStreamTask {
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
int32_t nodeId;
int32_t nodeId; // vgroup id
SEpSet epSet;
SCheckpointInfo chkInfo;
STaskExec exec;
// fill history
int8_t fillHistory;
int8_t fillHistory; // fill history
int64_t ekey; // end ts key
int64_t endVer; // end version
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......@@ -351,7 +351,7 @@ typedef struct SStreamMeta {
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
......
......@@ -663,9 +663,10 @@ typedef struct {
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int32_t fixedSinkVgId; // 0 for shuffle
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
int32_t fixedSinkVgId; // 0 for shuffle
// transformation
char* sql;
......
......@@ -986,20 +986,20 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
}
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
if (numOfVnodes > 0 || pMObj != NULL || pSObj != NULL || pQObj != NULL) {
bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
if (isonline && force) {
terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
if (!isonline && !force) {
terrno = TSDB_CODE_DNODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
if (isonline && force) {
terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
if (!isonline && !force) {
terrno = TSDB_CODE_DNODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
......
......@@ -14,18 +14,8 @@
*/
#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndStream.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "parser.h"
#include "tcompare.h"
......@@ -34,12 +24,8 @@
extern bool tsDeployOnSnode;
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
......@@ -97,7 +83,7 @@ END:
return terrno;
}
int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
......@@ -106,16 +92,23 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
if (pTask->tbSink.pSchemaWrapper == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return 0;
}
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
#define SINK_NODE_LEVEL (0)
int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) {
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
......@@ -127,47 +120,46 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
sdbRelease(pMnode->pSdb, pDb);
}
SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
if (isShuffle) {
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
for (int32_t i = 0; i < sz; i++) {
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->id.taskId;
for (int32_t j = 0; j < numOfSinkNodes; j++) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
if (pSinkTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pSinkTask->id.taskId;
break;
}
}
}
} else {
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
setFixedDownstreamEpInfo(pTask, pOneSinkTask);
}
return 0;
}
int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.nodeId = pTask->nodeId;
plan->execNode.epSet = pTask->epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
return 0;
}
......@@ -210,100 +202,121 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup;
}
// create sink node for each vgroup.
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
while (1) {
SVgObj* pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
if (pIter == NULL) {
break;
}
SStreamTask* pTask = tNewStreamTask(pStream->uid);
if (pTask == NULL) {
if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
continue;
}
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// type
pTask->taskLevel = TASK_LEVEL__SINK;
// sink
if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
if (pTask->tbSink.pSchemaWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup);
sdbRelease(pSdb, pVgroup);
}
return 0;
}
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewStreamTask(pStream->uid);
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pStream->fixedSinkVgId;
#if 0
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
if (pVgroup == NULL) {
return -1;
}
pTask->nodeId = vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
#endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
mndSetSinkTaskInfo(pStream, pTask);
return 0;
}
pTask->taskLevel = TASK_LEVEL__SINK;
static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) {
return 0;
}
// sink
if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream,
SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory,
bool hasExtraSink) {
SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList);
if (pTask == NULL) {
return terrno;
}
// sink or dispatch
if (hasExtraSink) {
mndAddDispatcherForInnerTask(pMnode, pStream, pTask);
} else {
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
mndSetSinkTaskInfo(pStream, pTask);
}
return 0;
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
return terrno;
}
return TSDB_CODE_SUCCESS;
}
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->taskId = pTask->id.taskId;
return pEpInfo;
}
void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher;
pDispatcher->taskId = pTask->id.taskId;
pDispatcher->nodeId = pTask->nodeId;
pDispatcher->epSet = pTask->epSet;
pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
}
int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if(pUpstream->childEpInfo == NULL) {
pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pUpstream->childEpInfo, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES);
bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
......@@ -313,13 +326,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
sdbRelease(pSdb, pDbObj);
if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
/*if (true) {*/
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
hasExtraSink = true;
if (pStream->fixedSinkVgId == 0) {
......@@ -328,19 +341,20 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
} else {
if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) {
if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) {
// TODO free
return -1;
}
}
}
pStream->totalLevel = planTotLevel + hasExtraSink;
if (planTotLevel > 1) {
SStreamTask* pInnerTask;
// inner level
{
SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskInnerLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
......@@ -350,25 +364,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
pInnerTask = tNewStreamTask(pStream->uid);
pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qDestroyQueryPlan(pPlan);
return -1;
}
pInnerTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
pInnerTask->taskLevel = TASK_LEVEL__AGG;
// trigger
pInnerTask->triggerParam = pStream->triggerParam;
// dispatch
if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) {
if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) {
qDestroyQueryPlan(pPlan);
return -1;
}
......@@ -377,7 +381,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
if (pSnode == NULL) {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
......@@ -392,17 +396,18 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
} else {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
}
// source level
SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskSourceLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
......@@ -416,66 +421,52 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewStreamTask(pStream->uid);
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->triggerParam = 0;
// source
pTask->taskLevel = TASK_LEVEL__SOURCE;
// add fixed vg dispatch
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pInnerTask);
pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
int32_t code = appendToUpstream(pTask, pInnerTask);
sdbRelease(pSdb, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qDestroyQueryPlan(pPlan);
return -1;
}
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->taskId = pTask->id.taskId;
taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
sdbRelease(pSdb, pVgroup);
}
}
if (planTotLevel == 1) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
} else if (planTotLevel == 1) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &pTaskList);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
if (LIST_LENGTH(inner->pNodeList) != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
......@@ -486,42 +477,26 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
if (pIter == NULL) {
break;
}
SStreamTask* pTask = tNewStreamTask(pStream->uid);
if (pTask == NULL) {
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
continue;
}
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskOneLevel, pTask);
// source
pTask->taskLevel = TASK_LEVEL__SOURCE;
// trigger
pTask->triggerParam = pStream->triggerParam;
// sink or dispatch
if (hasExtraSink) {
mndAddDispatcherToInnerTask(pMnode, pStream, pTask);
} else {
mndAddSinkToTask(pMnode, pStream, pTask);
}
// new stream task
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink);
sdbRelease(pSdb, pVgroup);
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
}
qDestroyQueryPlan(pPlan);
return 0;
}
......
......@@ -700,6 +700,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (pStream->sourceDbUid == streamObj.sourceDbUid) {
++numOfStream;
}
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
......@@ -723,6 +724,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
pDb = NULL;
goto _OVER;
}
mndReleaseDb(pMnode, pDb);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
......
......@@ -87,6 +87,28 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
)
IF (TD_LINUX)
target_link_libraries(
vnode
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC tfs
PUBLIC wal
PUBLIC qworker
PUBLIC sync
PUBLIC executor
PUBLIC scheduler
PUBLIC tdb
# PUBLIC bdb
# PUBLIC scalar
PUBLIC rocksdb-shared
PUBLIC transport
PUBLIC stream
PUBLIC index
)
ELSE()
target_link_libraries(
vnode
PUBLIC os
......@@ -107,6 +129,7 @@ target_link_libraries(
PUBLIC stream
PUBLIC index
)
ENDIF()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
......
......@@ -100,18 +100,11 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
// meta
typedef struct SMeta SMeta; // todo: remove
typedef struct SMetaReader SMetaReader;
typedef struct SMetaEntry SMetaEntry;
#define META_READER_NOLOCK 0x1
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI);
void metaReaderReleaseLock(SMetaReader *pReader);
void metaReaderClear(SMetaReader *pReader);
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *uidList);
int32_t metaGetTableTagsByUids(void* pVnode, int64_t suid, SArray *uidList);
int32_t metaReadNext(SMetaReader *pReader);
......@@ -122,37 +115,18 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(void* pVnode, tb_uid_t uid);
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
int32_t metaGetCachedTableUidList(void *pVnode, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t metaUidFilterCachePut(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid);
int32_t metaGetCachedTbGroup(SMeta *pMeta, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList);
int32_t metaPutTbGroupToCache(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList);
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen);
int64_t metaGetTbNum(SMeta *pMeta);
int64_t metaGetNtbNum(SMeta *pMeta);
//typedef struct {
// int64_t uid;
// int64_t ctbNum;
//} SMetaStbStats;
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
#if 1 // refact APIs below (TODO)
typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg;
typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(void *pVnode);
void metaCloseTbCursor(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
// tsdb
typedef struct STsdbReader STsdbReader;
......@@ -168,8 +142,8 @@ typedef struct STsdbReader STsdbReader;
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly,
int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly,
SHashObj **pIgnoreTables);
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
......@@ -201,32 +175,11 @@ size_t tsdbCacheGetUsage(SVnode *pVnode);
int32_t tsdbCacheGetElems(SVnode *pVnode);
//// tq
//typedef struct SMetaTableInfo {
// int64_t suid;
// int64_t uid;
// SSchemaWrapper *schema;
// char tbName[TSDB_TABLE_NAME_LEN];
//} SMetaTableInfo;
typedef struct SIdInfo {
int64_t version;
int32_t index;
} SIdInfo;
//typedef struct SSnapContext {
// SMeta *pMeta;
// int64_t snapVersion;
// TBC *pCur;
// int64_t suid;
// int8_t subType;
// SHashObj *idVersion;
// SHashObj *suidInfo;
// SArray *idList;
// int32_t index;
// bool withMeta;
// bool queryMeta; // true-get meta, false-get data
//} SSnapContext;
typedef struct STqReader {
SPackedData msg;
SSubmitReq2 submit;
......@@ -345,59 +298,6 @@ struct SVnodeCfg {
#define TABLE_ROLLUP_ON ((int8_t)0x1)
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
#define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON)
//struct SMetaEntry {
// int64_t version;
// int8_t type;
// int8_t flags; // TODO: need refactor?
// tb_uid_t uid;
// char *name;
// union {
// struct {
// SSchemaWrapper schemaRow;
// SSchemaWrapper schemaTag;
// SRSmaParam rsmaParam;
// } stbEntry;
// struct {
// int64_t ctime;
// int32_t ttlDays;
// int32_t commentLen;
// char *comment;
// tb_uid_t suid;
// uint8_t *pTags;
// } ctbEntry;
// struct {
// int64_t ctime;
// int32_t ttlDays;
// int32_t commentLen;
// char *comment;
// int32_t ncid; // next column id
// SSchemaWrapper schemaRow;
// } ntbEntry;
// struct {
// STSma *tsma;
// } smaEntry;
// };
//
// uint8_t *pBuf;
//};
//struct SMetaReader {
// int32_t flags;
// SMeta *pMeta;
// SDecoder coder;
// SMetaEntry me;
// void *pBuf;
// int32_t szBuf;
//};
//struct SMTbCursor {
// TBC *pDbc;
// void *pKey;
// void *pVal;
// int32_t kLen;
// int32_t vLen;
// SMetaReader mr;
//};
#ifdef __cplusplus
}
......
......@@ -103,6 +103,17 @@ struct SQueryNode {
_query_reseek_func_t reseek;
};
#if 1 // refact APIs below (TODO)
typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg;
SMTbCursor *metaOpenTbCursor(void *pVnode);
void metaCloseTbCursor(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
......@@ -143,6 +154,9 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage);
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid);
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
......@@ -477,6 +491,7 @@ struct SCompactInfo {
void initStorageAPI(SStorageAPI* pAPI);
#ifdef __cplusplus
}
#endif
......
......@@ -499,8 +499,9 @@ static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid,
ASSERT(keyLen == sizeof(uint64_t) * 2);
}
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
// generate the composed key for LRU cache
......@@ -603,9 +604,10 @@ static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyL
}
// check both the payload size and selectivity ratio
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen, double selectivityRatio) {
int32_t code = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
if (selectivityRatio > tsSelectivityRatio) {
......@@ -702,7 +704,8 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
return TSDB_CODE_SUCCESS;
}
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
// generate the composed key for LRU cache
......@@ -786,9 +789,10 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value)
}
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen) {
int32_t code = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
if (payloadLen > tsTagFilterResCacheSize) {
......
......@@ -700,8 +700,6 @@ int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
}
int64_t metaGetNtbNum(SMeta *pMeta) { return pMeta->pVnode->config.vndStats.numOfNTables; }
typedef struct {
SMeta *pMeta;
TBC *pCur;
......
......@@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
#include "meta.h"
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME);
......
......@@ -128,6 +128,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
rocksdb_options_set_comparator(options, cmp);
rocksdb_block_based_options_set_block_cache(tableoptions, cache);
rocksdb_options_set_block_based_table_factory(options, tableoptions);
rocksdb_options_set_info_log_level(options, 2); // WARN_LEVEL
// rocksdb_options_set_inplace_update_support(options, 1);
// rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
......
......@@ -754,7 +754,7 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit
return terrno;
}
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void** ppReader, int32_t capacity,
SSDataBlock* pResBlock, const char* idstr) {
int32_t code = 0;
int8_t level = 0;
......@@ -4396,11 +4396,12 @@ static void freeSchemaFunc(void* param) {
}
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) {
int32_t tsdbReaderOpen(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) {
STimeWindow window = pCond->twindows;
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
int32_t capacity = pVnode->config.tsdbCfg.maxRows;
int32_t capacity = pConf->tsdbCfg.maxRows;
if (pResBlock != NULL) {
blockDataEnsureCapacity(pResBlock, capacity);
}
......@@ -4431,7 +4432,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
// here we only need one more row, so the capacity is set to be ONE.
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
code = tsdbReaderCreate(pVnode, pCond, (void**)&((STsdbReader*)pReader)->innerReader[0], 1, pResBlock, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......@@ -4445,7 +4446,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
pCond->order = order;
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
code = tsdbReaderCreate(pVnode, pCond, (void**)&((STsdbReader*)pReader)->innerReader[1], 1, pResBlock, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......@@ -4495,7 +4496,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
goto _err;
}
pReader->status.pLDataIter = taosMemoryCalloc(pVnode->config.sttTrigger, sizeof(SLDataIter));
pReader->status.pLDataIter = taosMemoryCalloc(pConf->sttTrigger, sizeof(SLDataIter));
if (pReader->status.pLDataIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......@@ -5546,7 +5547,7 @@ int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTabl
int64_t key = INT64_MIN;
for(int32_t i = 0; i < numOfTables; ++i) {
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, &pReader, pIdStr, false, NULL);
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, (void**)&pReader, pIdStr, false, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -31,17 +31,18 @@ static void initSnapshotFn(SStoreSnapshotFn* pSnapshot);
void initStorageAPI(SStorageAPI* pAPI) {
initTsdbReaderAPI(&pAPI->tsdReader);
initMetadataAPI(&pAPI->metaFn);
initTqAPI(&pAPI->tqReaderFn);
initStateStoreAPI(&pAPI->stateStore);
initMetaReaderAPI(&pAPI->metaReaderFn);
initMetaFilterAPI(&pAPI->metaFilter);
initTqAPI(&pAPI->tqReaderFn);
initFunctionStateStore(&pAPI->functionStore);
initCacheFn(&pAPI->cacheFn);
initSnapshotFn(&pAPI->snapshotFn);
}
void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdReaderOpen = (__store_reader_open_fn_t)tsdbReaderOpen;
pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*,
bool, SHashObj**))tsdbReaderOpen;
pReader->tsdReaderClose = tsdbReaderClose;
pReader->tsdNextDataBlock = tsdbNextDataBlock;
......@@ -87,6 +88,12 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor
pMeta->storeGetTableList = vnodeGetTableList;
pMeta->getCachedTableList = metaGetCachedTableUidList;
pMeta->putCachedTableList = metaUidFilterCachePut;
pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup;
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
}
void initTqAPI(SStoreTqReader* pTq) {
......
......@@ -495,7 +495,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
genTbGroupDigest((SNode*)listNode, digest, &context);
nodesFree(listNode);
pAPI->metaFn.getCachedTableList(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), &tableList);
pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), &tableList);
if (tableList) {
taosArrayDestroy(pTableListInfo->pTableList);
pTableListInfo->pTableList = tableList;
......@@ -632,7 +632,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
if (tsTagFilterCache) {
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
pAPI->metaFn.putTableListIntoCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
}
// int64_t st2 = taosGetTimestampUs();
......
......@@ -3446,7 +3446,7 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
pRes->info.id.groupId = groupId;
int64_t numOfTables = 0;//metaGetNtbNum(pInfo->readHandle.vnode);
int64_t numOfTables = 0;
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables);
if (numOfTables != 0) {
......
......@@ -6,13 +6,23 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
if(${BUILD_WITH_ROCKSDB})
IF (TD_LINUX)
target_link_libraries(
stream
PUBLIC rocksdb-shared tdb
PRIVATE os util transport qcom executor wal index
)
ELSE()
target_link_libraries(
stream
PUBLIC rocksdb tdb
PRIVATE os util transport qcom executor wal index
)
ENDIF()
target_include_directories(
stream
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
......
......@@ -17,14 +17,25 @@
#include "tstream.h"
#include "wal.h"
SStreamTask* tNewStreamTask(int64_t streamId) {
static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId;
pTask->taskLevel = taskLevel;
pTask->fillHistory = fillHistory;
pTask->triggerParam = triggerParam;
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
......@@ -34,6 +45,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
mndAddToTaskset(pTaskList, pTask);
return pTask;
}
......
......@@ -166,7 +166,7 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
char* t = NULL;
if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
void* payload = GET_PAYLOAD_DATA(pg);
t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
t = doCompressData(payload, pBuf->pageSize + sizeof(SFilePage), &size, pBuf);
if (size < 0) {
uError("failed to compress data when flushing data to disk, %s", pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
......
......@@ -75,4 +75,12 @@ target_link_libraries(rbtreeTest os util gtest_main)
add_test(
NAME rbtreeTest
COMMAND rbtreeTest
)
\ No newline at end of file
)
# pageBufferTest
add_executable(pageBufferTest "pageBufferTest.cpp")
target_link_libraries(pageBufferTest os util gtest_main)
add_test(
NAME pageBufferTest
COMMAND pageBufferTest
)
......@@ -157,6 +157,68 @@ void recyclePageTest() {
destroyDiskbasedBuf(pBuf);
}
int saveDataToPage(SFilePage* pPg, const char* data, uint32_t len) {
memcpy(pPg->data + pPg->num, data, len);
pPg->num += len;
setBufPageDirty(pPg, true);
return 0;
}
bool checkBufVarData(SFilePage* pPg, const char* varData, uint32_t varLen) {
const char* start = pPg->data + sizeof(SFilePage);
for (uint32_t i = 0; i < (pPg->num - sizeof(SFilePage)) / varLen; ++i) {
if (0 != strncmp(start + 6 * i + 3, varData, varLen - 3)) {
using namespace std;
cout << "pos: " << sizeof(SFilePage) + 6 * i + 3 << " should be " << varData << " but is: " << start + 6 * i + 3
<< endl;
return false;
}
}
return true;
}
// SPageInfo.pData: | sizeof(void*) 8 bytes | sizeof(SFilePage) 4 bytes| 4096 bytes |
// ^
// |
// SFilePage: flush to disk from here
void testFlushAndReadBackBuffer() {
SDiskbasedBuf* pBuf = NULL;
uint32_t totalLen = 4096;
auto code = createDiskbasedBuf(&pBuf, totalLen, totalLen * 2, "1", TD_TMP_DIR_PATH);
int32_t pageId = -1;
auto* pPg = (SFilePage*)getNewBufPage(pBuf, &pageId);
ASSERT_TRUE(pPg != nullptr);
pPg->num = sizeof(SFilePage);
// save data into page
uint32_t len = 6; // sizeof(SFilePage) + 6 * 682 = 4096
// nullbitmap(1) + len(2) + AA\0(3)
char* rowData = (char*)taosMemoryCalloc(1, len);
*(uint16_t*)(rowData + 2) = (uint16_t)2;
rowData[3] = 'A';
rowData[4] = 'A';
while (pPg->num + len <= getBufPageSize(pBuf)) {
saveDataToPage(pPg, rowData, len);
}
ASSERT_EQ(pPg->num, totalLen);
ASSERT_TRUE(checkBufVarData(pPg, rowData + 3, len));
releaseBufPage(pBuf, pPg);
// flush to disk
int32_t newPgId = -1;
pPg = (SFilePage*)getNewBufPage(pBuf, &newPgId);
releaseBufPage(pBuf, pPg);
pPg = (SFilePage*)getNewBufPage(pBuf, &newPgId);
releaseBufPage(pBuf, pPg);
// reload it from disk
pPg = (SFilePage*)getBufPage(pBuf, pageId);
ASSERT_TRUE(checkBufVarData(pPg, rowData + 3, len));
destroyDiskbasedBuf(pBuf);
}
} // namespace
TEST(testCase, resultBufferTest) {
......@@ -164,6 +226,7 @@ TEST(testCase, resultBufferTest) {
simpleTest();
writeDownTest();
recyclePageTest();
testFlushAndReadBackBuffer();
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
......@@ -35,7 +35,7 @@ endi
print =============== step2 drop dnode 3
sql_error drop dnode 1
sql drop dnode 3
sql drop dnode 3 force
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
......
......@@ -57,7 +57,7 @@ if $data(2)[7] != @status msg timeout@ then
endi
print ========== step4
sql drop dnode 2
sql drop dnode 2 force
sql select * from information_schema.ins_dnodes
if $rows != 1 then
return -1
......
......@@ -20,7 +20,7 @@ sql_error alter user u2 sysinfo 0
print =============== step2 create drop dnode
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql drop dnode 3
sql drop dnode 3 force
sql alter dnode 1 'debugflag 131'
print =============== step3: select * from information_schema.ins_dnodes
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册