提交 3b379f7b 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

...@@ -13,15 +13,23 @@ ...@@ -13,15 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_TVK_ROCKSDB_H_ #ifndef _TD_COMMON_CFG_H_
#define _TD_TVK_ROCKSDB_H_ #define _TD_COMMON_CFG_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "tdef.h"
typedef struct {
char dir[TSDB_FILENAME_LEN];
int level;
int primary;
} SDiskCfg;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_TVK_ROCKSDB_H_*/ #endif /*_TD_COMMON_CFG_H_*/
\ No newline at end of file
...@@ -21,6 +21,7 @@ extern "C" { ...@@ -21,6 +21,7 @@ extern "C" {
#endif #endif
#include "tdef.h" #include "tdef.h"
#include "tcfg.h"
// cluster // cluster
extern char tsFirst[]; extern char tsFirst[];
...@@ -105,11 +106,6 @@ extern uint32_t tsMaxRange; ...@@ -105,11 +106,6 @@ extern uint32_t tsMaxRange;
extern uint32_t tsCurRange; extern uint32_t tsCurRange;
extern char tsCompressor[]; extern char tsCompressor[];
typedef struct {
char dir[TSDB_FILENAME_LEN];
int level;
int primary;
} SDiskCfg;
extern int32_t tsDiskCfgNum; extern int32_t tsDiskCfgNum;
extern SDiskCfg tsDiskCfg[]; extern SDiskCfg tsDiskCfg[];
......
...@@ -58,6 +58,7 @@ int metaCommit(SMeta *pMeta); ...@@ -58,6 +58,7 @@ int metaCommit(SMeta *pMeta);
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta); SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor *pTbCur);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_TSDB_H_ #define _TD_TSDB_H_
#include "mallocator.h" #include "mallocator.h"
#include "meta.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -33,7 +34,7 @@ typedef struct SDataStatis { ...@@ -33,7 +34,7 @@ typedef struct SDataStatis {
} SDataStatis; } SDataStatis;
typedef struct STable { typedef struct STable {
int32_t tid; uint64_t tid;
uint64_t uid; uint64_t uid;
STSchema *pSchema; STSchema *pSchema;
} STable; } STable;
...@@ -54,10 +55,11 @@ typedef struct STsdbCfg { ...@@ -54,10 +55,11 @@ typedef struct STsdbCfg {
int32_t keep1; int32_t keep1;
int32_t keep2; int32_t keep2;
int8_t update; int8_t update;
int8_t compression;
} STsdbCfg; } STsdbCfg;
// STsdb // STsdb
STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta);
void tsdbClose(STsdb *); void tsdbClose(STsdb *);
void tsdbRemove(const char *path); void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp); int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include "common.h"
#include "taosdef.h"
#include "tarray.h"
#include "tdataformat.h"
#include "thash.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tmsg.h"
#include "tname.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_VERSION_MAJOR 1
#define TSDB_VERSION_MINOR 0
#define TSDB_INVALID_SUPER_TABLE_ID -1
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 // commit no block, need to be solved
// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
#define TSDB_STATE_BAD_META 0x1
#define TSDB_STATE_BAD_DATA 0x2
typedef struct SDataStatis {
int16_t colId;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
} SDataStatis;
// --------- TSDB APPLICATION HANDLE DEFINITION
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct {
int32_t tsdbId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile; // day per file sharding policy
int32_t keep; // day of data to keep
int32_t keep1;
int32_t keep2;
int32_t lruCacheSize;
int32_t minRowsPerFileBlock; // minimum rows per file block
int32_t maxRowsPerFileBlock; // maximum rows per file block
int8_t precision;
int8_t compression;
int8_t update;
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
} STsdbCfg;
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
// --------- TSDB REPOSITORY USAGE STATISTICS
typedef struct {
int64_t totalStorage; // total bytes occupie
int64_t compStorage;
int64_t pointsWritten; // total data points written
} STsdbStat;
typedef struct STsdb STsdb;
STsdbCfg *tsdbGetCfg(const STsdb *repo);
// --------- TSDB REPOSITORY DEFINITION
// int32_t tsdbCreateRepo(int repoid);
// int32_t tsdbDropRepo(int repoid);
STsdb * tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbClose(STsdb *repo, int toCommit);
int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdb *repo);
int8_t tsdbGetCompactState(STsdb *repo);
// --------- TSDB TABLE DEFINITION
typedef struct {
uint64_t uid; // the unique table ID
int32_t tid; // the table ID in the repository.
} STableId;
// --------- TSDB TABLE configuration
typedef struct {
ETableType type;
char * name;
STableId tableId;
int32_t sversion;
char * sname; // super table name
uint64_t superUid;
STSchema * schema;
STSchema * tagSchema;
SKVRow tagValues;
char * sql;
} STableCfg;
void tsdbClearTableCfg(STableCfg *config);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type);
char *tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId *)(_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(STsdb *repo, STableCfg *pCfg);
int tsdbDropTable(STsdb *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(STsdb *repo, SUpdateTableTagValMsg *pMsg);
uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
// the TSDB repository info
typedef struct STsdbRepoInfo {
STsdbCfg tsdbCfg;
uint64_t version; // version of the repository
int64_t tsdbTotalDataSize; // the original inserted data size
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add
} STsdbRepoInfo;
STsdbRepoInfo *tsdbGetStatus(STsdb *pRepo);
// the meter information report structure
typedef struct {
STableCfg tableCfg;
uint64_t version;
int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes
} STableInfo;
// -- FOR INSERT DATA
/**
* Insert data to a table in a repository
* @param pRepo the TSDB repository handle
* @param pData the data to insert (will give a more specific description)
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(STsdb *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
// -- FOR QUERY TIME SERIES DATA
typedef void *TsdbQueryHandleT; // Use void to hide implementation details
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} STsdbQueryCond;
typedef struct STableData STableData;
typedef struct {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
int32_t maxTables;
STableData **tData;
SList * actList;
SList * extraBuffList;
SList * bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
} SMemTable;
typedef struct {
SMemTable *mem;
SMemTable *imem;
SMemTable mtable;
SMemTable *omem;
} SMemSnapshot;
typedef struct SMemRef {
int32_t ref;
SMemSnapshot snapshot;
} SMemRef;
#if 0
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
typedef struct {
void *pTable;
TSKEY lastKey;
} STableKeyInfo;
typedef struct {
uint32_t numOfTables;
SArray * pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray * dataBlockInfos;
} STableBlockDist;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
SMemRef *pRef);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
SMemRef *pRef);
TsdbQueryHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
SMemRef *pMemRef);
bool isTsdbCacheLastRow(TsdbQueryHandleT *pQueryHandle);
/**
* get the queried table object list
* @param pHandle
* @return
*/
SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
/**
* get the group list according to table id from client
* @param tsdb
* @param pCond
* @param groupList
* @param qinfo
* @return
*/
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
uint64_t qId, SMemRef *pRef);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT *pHandle);
/**
* move to next block if exists
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle);
/**
* Get current data block information
*
* @param pQueryHandle
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pQueryHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList);
/**
* Get the qualified table id for a super table according to the tag query expression.
* @param stableid. super table sid
* @param pTagCond. tag query condition
*/
int32_t tsdbQuerySTableByTagCond(STsdb *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo *groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT *queryHandle, STableBlockDist *pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
/**
* get the statistics of repo usage
* @param repo. point to the tsdbrepo
* @param totalPoints. total data point written
* @param totalStorage. total bytes took by the tsdb
* @param compStorage. total bytes took by the tsdb after compressed
*/
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
int tsdbInitCommitQueue();
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(STsdb *repo);
void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle);
// For TSDB file sync
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// // For TSDB Compact
// int tsdbCompact(STsdb *pRepo);
// For TSDB Health Monitor
// // no problem return true
// bool tsdbNoProblem(STsdb *pRepo);
// // unit of walSize: MB
// int tsdbCheckWal(STsdb *pRepo, uint32_t walSize);
// // for json tag
// void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes);
// void getJsonTagValueAll(void *data, void *dst, int16_t bytes);
// char *parseTagDatatoJson(void *p);
#endif
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDB_H_
...@@ -89,7 +89,7 @@ void vnodeClear(); ...@@ -89,7 +89,7 @@ void vnodeClear();
* @param pVnodeCfg options of the vnode * @param pVnodeCfg options of the vnode
* @return SVnode* The vnode object * @return SVnode* The vnode object
*/ */
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid);
/** /**
* @brief Close a VNODE * @brief Close a VNODE
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef _TD_TKV_H_ #ifndef _TD_TKV_H_
#define _TD_TKV_H_ #define _TD_TKV_H_
#if 0
#include "os.h" #include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -59,4 +60,5 @@ void tkvWriteOptsDestroy(STkvWriteOpts *); ...@@ -59,4 +60,5 @@ void tkvWriteOptsDestroy(STkvWriteOpts *);
} }
#endif #endif
#endif
#endif /*_TD_TKV_H_*/ #endif /*_TD_TKV_H_*/
\ No newline at end of file
...@@ -54,7 +54,7 @@ int32_t taosFtruncateFile(FileFd fd, int64_t length); ...@@ -54,7 +54,7 @@ int32_t taosFtruncateFile(FileFd fd, int64_t length);
int32_t taosFsyncFile(FileFd fd); int32_t taosFsyncFile(FileFd fd);
int64_t taosReadFile(FileFd fd, void *buf, int64_t count); int64_t taosReadFile(FileFd fd, void *buf, int64_t count);
int64_t taosWriteFile(FileFd fd, void *buf, int64_t count); int64_t taosWriteFile(FileFd fd, const void *buf, int64_t count);
void taosCloseFile(FileFd fd); void taosCloseFile(FileFd fd);
......
...@@ -26,14 +26,10 @@ extern "C" { ...@@ -26,14 +26,10 @@ extern "C" {
#define TD_MOD_UNINITIALIZED 0 #define TD_MOD_UNINITIALIZED 0
#define TD_MOD_INITIALIZED 1 #define TD_MOD_INITIALIZED 1
#define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1
typedef int8_t td_mode_flag_t; typedef int8_t td_mode_flag_t;
#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED) #define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_INITIALIZED, TD_MOD_UNINITIALIZED)
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#define TD_IS_NULL(PTR) ((PTR) == NULL) #define TD_IS_NULL(PTR) ((PTR) == NULL)
......
...@@ -305,6 +305,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -305,6 +305,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
} else { } else {
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
pRequest->code = terrno;
return pRequest;
} }
_return: _return:
......
...@@ -11,6 +11,7 @@ target_link_libraries( ...@@ -11,6 +11,7 @@ target_link_libraries(
PUBLIC wal PUBLIC wal
PUBLIC sync PUBLIC sync
PUBLIC taos PUBLIC taos
PUBLIC tfs
) )
target_include_directories( target_include_directories(
dnode dnode
......
...@@ -21,8 +21,9 @@ extern "C" { ...@@ -21,8 +21,9 @@ extern "C" {
#endif #endif
#include "dndInt.h" #include "dndInt.h"
int32_t dndInitDnode(SDnode *pDnode); int32_t dndInitMgmt(SDnode *pDnode);
void dndCleanupDnode(SDnode *pDnode); void dndStopMgmt(SDnode *pDnode);
void dndCleanupMgmt(SDnode *pDnode);
int32_t dndGetDnodeId(SDnode *pDnode); int32_t dndGetDnodeId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode); int64_t dndGetClusterId(SDnode *pDnode);
......
...@@ -497,7 +497,7 @@ static void *dnodeThreadRoutine(void *param) { ...@@ -497,7 +497,7 @@ static void *dnodeThreadRoutine(void *param) {
} }
} }
int32_t dndInitDnode(SDnode *pDnode) { int32_t dndInitMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->dnodeId = 0; pMgmt->dnodeId = 0;
...@@ -547,16 +547,18 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -547,16 +547,18 @@ int32_t dndInitDnode(SDnode *pDnode) {
return 0; return 0;
} }
void dndCleanupDnode(SDnode *pDnode) { void dndStopMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupWorker(&pMgmt->mgmtWorker); dndCleanupWorker(&pMgmt->mgmtWorker);
if (pMgmt->threadId != NULL) { if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId); taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL; pMgmt->threadId = NULL;
} }
}
void dndCleanupMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
if (pMgmt->dnodeEps != NULL) { if (pMgmt->dnodeEps != NULL) {
......
...@@ -33,9 +33,9 @@ typedef struct { ...@@ -33,9 +33,9 @@ typedef struct {
int8_t dropped; int8_t dropped;
int8_t accessState; int8_t accessState;
uint64_t dbUid; uint64_t dbUid;
char *db; char * db;
char *path; char * path;
SVnode *pImpl; SVnode * pImpl;
STaosQueue *pWriteQ; STaosQueue *pWriteQ;
STaosQueue *pSyncQ; STaosQueue *pSyncQ;
STaosQueue *pApplyQ; STaosQueue *pApplyQ;
...@@ -49,7 +49,7 @@ typedef struct { ...@@ -49,7 +49,7 @@ typedef struct {
int32_t failed; int32_t failed;
int32_t threadIndex; int32_t threadIndex;
pthread_t thread; pthread_t thread;
SDnode *pDnode; SDnode * pDnode;
SWrapperCfg *pCfgs; SWrapperCfg *pCfgs;
} SVnodeThread; } SVnodeThread;
...@@ -67,7 +67,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE ...@@ -67,7 +67,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl); static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode); static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
...@@ -80,7 +80,7 @@ static void dndCloseVnodes(SDnode *pDnode); ...@@ -80,7 +80,7 @@ static void dndCloseVnodes(SDnode *pDnode);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = NULL; SVnodeObj * pVnode = NULL;
int32_t refCount = 0; int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
...@@ -111,7 +111,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { ...@@ -111,7 +111,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) { if (pVnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -167,6 +167,11 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { ...@@ -167,6 +167,11 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
dDebug("vgId:%d, vnode is closed", pVnode->vgId); dDebug("vgId:%d, vnode is closed", pVnode->vgId);
if (pVnode->dropped) {
dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
vnodeDestroy(pVnode->path);
}
free(pVnode->path); free(pVnode->path);
free(pVnode->db); free(pVnode->db);
free(pVnode); free(pVnode);
...@@ -183,7 +188,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { ...@@ -183,7 +188,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
void *pIter = taosHashIterate(pMgmt->hash, NULL); void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) { while (pIter) {
SVnodeObj **ppVnode = pIter; SVnodeObj **ppVnode = pIter;
SVnodeObj *pVnode = *ppVnode; SVnodeObj * pVnode = *ppVnode;
if (pVnode && num < size) { if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
...@@ -205,9 +210,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ ...@@ -205,9 +210,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 30000; int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
cJSON *root = NULL; cJSON * root = NULL;
FILE *fp = NULL; FILE * fp = NULL;
char file[PATH_MAX + 20] = {0}; char file[PATH_MAX + 20] = {0};
SWrapperCfg *pCfgs = NULL; SWrapperCfg *pCfgs = NULL;
...@@ -320,7 +325,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { ...@@ -320,7 +325,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 65536; int32_t maxLen = 65536;
char *content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n"); len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n");
...@@ -362,8 +367,8 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { ...@@ -362,8 +367,8 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
static void *dnodeOpenVnodeFunc(void *param) { static void *dnodeOpenVnodeFunc(void *param) {
SVnodeThread *pThread = param; SVnodeThread *pThread = param;
SDnode *pDnode = pThread->pDnode; SDnode * pDnode = pThread->pDnode;
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt * pMgmt = &pDnode->vmgmt;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes"); setThreadName("open-vnodes");
...@@ -376,10 +381,7 @@ static void *dnodeOpenVnodeFunc(void *param) { ...@@ -376,10 +381,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt->openVnodes, pMgmt->totalVnodes); pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg vnodeCfg = {0}; SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId);
vnodeCfg.vgId = pCfg->vgId;
SVnode *pImpl = vnodeOpen(pCfg->path, &vnodeCfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++; pThread->failed++;
...@@ -469,6 +471,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { ...@@ -469,6 +471,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
} }
static void dndCloseVnodes(SDnode *pDnode) { static void dndCloseVnodes(SDnode *pDnode) {
dInfo("start to close all vnodes");
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
...@@ -578,7 +581,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -578,7 +581,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1; return -1;
} }
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
return -1; return -1;
...@@ -661,8 +664,6 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -661,8 +664,6 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
dndCloseVnode(pDnode, pVnode); dndCloseVnode(pDnode, pVnode);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode); dndWriteVnodesToFile(pDnode);
return 0; return 0;
...@@ -992,12 +993,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { ...@@ -992,12 +993,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
pLoads->num = taosHashGetSize(pMgmt->hash); pLoads->num = taosHashGetSize(pMgmt->hash);
int32_t v = 0; int32_t v = 0;
void *pIter = taosHashIterate(pMgmt->hash, NULL); void * pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) { while (pIter) {
SVnodeObj **ppVnode = pIter; SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue; if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode; SVnodeObj * pVnode = *ppVnode;
SVnodeLoad *pLoad = &pLoads->data[v++]; SVnodeLoad *pLoad = &pLoads->data[v++];
vnodeGetLoad(pVnode->pImpl, pLoad); vnodeGetLoad(pVnode->pImpl, pLoad);
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "dndVnodes.h" #include "dndVnodes.h"
#include "sync.h" #include "sync.h"
#include "wal.h" #include "wal.h"
#include "tfs.h"
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
...@@ -185,13 +186,23 @@ SDnode *dndInit(SDnodeOpt *pOption) { ...@@ -185,13 +186,23 @@ SDnode *dndInit(SDnodeOpt *pOption) {
return NULL; return NULL;
} }
SDiskCfg dCfg;
strcpy(dCfg.dir, pDnode->opt.dataDir);
dCfg.level = 0;
dCfg.primary = 1;
if (tfsInit(&dCfg, 1) != 0) {
dError("failed to init tfs env");
dndCleanup(pDnode);
return NULL;
}
if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) { if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) {
dError("failed to init vnode env"); dError("failed to init vnode env");
dndCleanup(pDnode); dndCleanup(pDnode);
return NULL; return NULL;
} }
if (dndInitDnode(pDnode) != 0) { if (dndInitMgmt(pDnode) != 0) {
dError("failed to init dnode"); dError("failed to init dnode");
dndCleanup(pDnode); dndCleanup(pDnode);
return NULL; return NULL;
...@@ -252,13 +263,15 @@ void dndCleanup(SDnode *pDnode) { ...@@ -252,13 +263,15 @@ void dndCleanup(SDnode *pDnode) {
dInfo("start to cleanup TDengine"); dInfo("start to cleanup TDengine");
dndSetStat(pDnode, DND_STAT_STOPPED); dndSetStat(pDnode, DND_STAT_STOPPED);
dndCleanupTrans(pDnode); dndCleanupTrans(pDnode);
dndStopMgmt(pDnode);
dndCleanupMnode(pDnode); dndCleanupMnode(pDnode);
dndCleanupBnode(pDnode); dndCleanupBnode(pDnode);
dndCleanupSnode(pDnode); dndCleanupSnode(pDnode);
dndCleanupQnode(pDnode); dndCleanupQnode(pDnode);
dndCleanupVnodes(pDnode); dndCleanupVnodes(pDnode);
dndCleanupDnode(pDnode); dndCleanupMgmt(pDnode);
vnodeClear(); vnodeClear();
tfsDestroy();
walCleanUp(); walCleanUp();
rpcCleanup(); rpcCleanup();
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "tlockfree.h" #include "tlockfree.h"
#include "tmacro.h" #include "tmacro.h"
#include "wal.h" #include "wal.h"
#include "tfs.h"
#include "vnode.h" #include "vnode.h"
...@@ -48,7 +49,6 @@ typedef struct SVnodeTask { ...@@ -48,7 +49,6 @@ typedef struct SVnodeTask {
typedef struct SVnodeMgr { typedef struct SVnodeMgr {
td_mode_flag_t vnodeInitFlag; td_mode_flag_t vnodeInitFlag;
td_mode_flag_t vnodeClearFlag;
// For commit // For commit
bool stop; bool stop;
uint16_t nthreads; uint16_t nthreads;
......
...@@ -97,14 +97,17 @@ int vnodeBufPoolSwitch(SVnode *pVnode) { ...@@ -97,14 +97,17 @@ int vnodeBufPoolSwitch(SVnode *pVnode) {
pVnode->pBufPool->inuse = NULL; pVnode->pBufPool->inuse = NULL;
TD_DLIST_APPEND(&(pVnode->pBufPool->incycle), pvma); if (pvma) {
TD_DLIST_APPEND(&(pVnode->pBufPool->incycle), pvma);
}
return 0; return 0;
} }
int vnodeBufPoolRecycle(SVnode *pVnode) { int vnodeBufPoolRecycle(SVnode *pVnode) {
SVBufPool * pBufPool = pVnode->pBufPool; SVBufPool * pBufPool = pVnode->pBufPool;
SVMemAllocator *pvma = TD_DLIST_HEAD(&(pBufPool->incycle)); SVMemAllocator *pvma = TD_DLIST_HEAD(&(pBufPool->incycle));
ASSERT(pvma != NULL); if (pvma == NULL) return 0;
// ASSERT(pvma != NULL);
TD_DLIST_POP(&(pBufPool->incycle), pvma); TD_DLIST_POP(&(pBufPool->incycle), pvma);
vmaReset(pvma); vmaReset(pvma);
......
...@@ -40,6 +40,7 @@ int vnodeAsyncCommit(SVnode *pVnode) { ...@@ -40,6 +40,7 @@ int vnodeAsyncCommit(SVnode *pVnode) {
int vnodeSyncCommit(SVnode *pVnode) { int vnodeSyncCommit(SVnode *pVnode) {
vnodeAsyncCommit(pVnode); vnodeAsyncCommit(pVnode);
vnodeWaitCommit(pVnode); vnodeWaitCommit(pVnode);
tsem_post(&(pVnode->canCommit));
return 0; return 0;
} }
......
...@@ -15,12 +15,12 @@ ...@@ -15,12 +15,12 @@
#include "vnodeDef.h" #include "vnodeDef.h"
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid);
static void vnodeFree(SVnode *pVnode); static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode);
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
// Set default options // Set default options
...@@ -35,7 +35,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { ...@@ -35,7 +35,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
} }
// Create the handle // Create the handle
pVnode = vnodeNew(path, pVnodeCfg); pVnode = vnodeNew(path, pVnodeCfg, vid);
if (pVnode == NULL) { if (pVnode == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
...@@ -62,7 +62,7 @@ void vnodeClose(SVnode *pVnode) { ...@@ -62,7 +62,7 @@ void vnodeClose(SVnode *pVnode) {
void vnodeDestroy(const char *path) { taosRemoveDir(path); } void vnodeDestroy(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); pVnode = (SVnode *)calloc(1, sizeof(*pVnode));
...@@ -71,6 +71,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { ...@@ -71,6 +71,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
return NULL; return NULL;
} }
pVnode->vgId = vid;
pVnode->path = strdup(path); pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
...@@ -105,7 +106,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -105,7 +106,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb // Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path); sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode)); pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta);
if (pVnode->pTsdb == NULL) { if (pVnode->pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
...@@ -137,7 +138,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -137,7 +138,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
} }
static void vnodeCloseImpl(SVnode *pVnode) { static void vnodeCloseImpl(SVnode *pVnode) {
// vnodeSyncCommit(pVnode); vnodeSyncCommit(pVnode);
if (pVnode) { if (pVnode) {
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
metaClose(pVnode->pMeta); metaClose(pVnode->pMeta);
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "vnodeDef.h" #include "vnodeDef.h"
SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED, .vnodeClearFlag = TD_MOD_UNCLEARD, .stop = false}; SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED};
static void* loop(void* arg); static void* loop(void* arg);
...@@ -24,6 +24,8 @@ int vnodeInit(uint16_t nthreads) { ...@@ -24,6 +24,8 @@ int vnodeInit(uint16_t nthreads) {
return 0; return 0;
} }
vnodeMgr.stop = false;
// Start commit handers // Start commit handers
if (nthreads > 0) { if (nthreads > 0) {
vnodeMgr.nthreads = nthreads; vnodeMgr.nthreads = nthreads;
...@@ -38,6 +40,7 @@ int vnodeInit(uint16_t nthreads) { ...@@ -38,6 +40,7 @@ int vnodeInit(uint16_t nthreads) {
for (uint16_t i = 0; i < nthreads; i++) { for (uint16_t i = 0; i < nthreads; i++) {
pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL); pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
} }
} else { } else {
// TODO: if no commit thread is set, then another mechanism should be // TODO: if no commit thread is set, then another mechanism should be
...@@ -53,7 +56,7 @@ int vnodeInit(uint16_t nthreads) { ...@@ -53,7 +56,7 @@ int vnodeInit(uint16_t nthreads) {
} }
void vnodeClear() { void vnodeClear() {
if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeClearFlag)) == TD_MOD_CLEARD) { if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_UNINITIALIZED) {
return; return;
} }
......
...@@ -589,4 +589,36 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { ...@@ -589,4 +589,36 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
return NULL; return NULL;
} }
} }
}
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
STSchemaBuilder sb;
STSchema * pTSchema = NULL;
SSchema * pSchema;
SSchemaWrapper *pSW;
STbCfg * pTbCfg;
tb_uid_t quid;
pTbCfg = metaGetTbInfoByUid(pMeta, uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
} else {
quid = uid;
}
pSW = metaGetTableSchema(pMeta, quid, sver, true);
if (pSW == NULL) {
return NULL;
}
// Rebuild a schema
tdInitTSchemaBuilder(&sb, 0);
for (int32_t i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pTSchema;
} }
\ No newline at end of file
...@@ -2,7 +2,7 @@ aux_source_directory(src TSDB_SRC) ...@@ -2,7 +2,7 @@ aux_source_directory(src TSDB_SRC)
if(0) if(0)
add_library(tsdb ${TSDB_SRC}) add_library(tsdb ${TSDB_SRC})
else(0) else(0)
add_library(tsdb "") add_library(tsdb STATIC "")
target_sources(tsdb target_sources(tsdb
PRIVATE PRIVATE
"src/tsdbCommit.c" "src/tsdbCommit.c"
...@@ -29,4 +29,5 @@ target_link_libraries( ...@@ -29,4 +29,5 @@ target_link_libraries(
PUBLIC common PUBLIC common
PUBLIC tkv PUBLIC tkv
PUBLIC tfs PUBLIC tfs
PUBLIC meta
) )
\ No newline at end of file
...@@ -39,6 +39,18 @@ static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) ...@@ -39,6 +39,18 @@ static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision)
} }
} }
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
#if 0 #if 0
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
...@@ -52,17 +64,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo ...@@ -52,17 +64,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf); bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
int tsdbApplyRtn(STsdbRepo *pRepo); int tsdbApplyRtn(STsdbRepo *pRepo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
#endif #endif
#endif /* _TD_TSDB_COMMIT_H_ */ #endif /* _TD_TSDB_COMMIT_H_ */
\ No newline at end of file
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#if 0
#ifndef _TD_TSDB_COMPACT_H_ #ifndef _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_ #define _TD_TSDB_COMPACT_H_
...@@ -19,14 +21,12 @@ ...@@ -19,14 +21,12 @@
extern "C" { extern "C" {
#endif #endif
#if 0
void *tsdbCompactImpl(STsdbRepo *pRepo); void *tsdbCompactImpl(STsdbRepo *pRepo);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /* _TD_TSDB_COMPACT_H_ */ #endif /* _TD_TSDB_COMPACT_H_ */
\ No newline at end of file
#endif
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_TSDB_DEF_H_ #define _TD_TSDB_DEF_H_
#include "mallocator.h" #include "mallocator.h"
#include "meta.h"
#include "tcompression.h" #include "tcompression.h"
#include "tglobal.h" #include "tglobal.h"
#include "thash.h" #include "thash.h"
...@@ -47,12 +48,17 @@ struct STsdb { ...@@ -47,12 +48,17 @@ struct STsdb {
STsdbMemTable * imem; STsdbMemTable * imem;
SRtn rtn; SRtn rtn;
SMemAllocatorFactory *pmaf; SMemAllocatorFactory *pmaf;
STsdbFS fs; STsdbFS * fs;
SMeta * pMeta;
}; };
#define REPO_ID(r) 0 #define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config) #define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (&(r)->fs) #define REPO_FS(r) (r)->fs
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) {
return pTable->pSchema;
}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -70,7 +70,7 @@ typedef struct { ...@@ -70,7 +70,7 @@ typedef struct {
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS *tsdbNewFS(STsdbCfg *pCfg); STsdbFS *tsdbNewFS(const STsdbCfg *pCfg);
void * tsdbFreeFS(STsdbFS *pfs); void * tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdb *pRepo); int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdb *pRepo); void tsdbCloseFS(STsdb *pRepo);
...@@ -79,7 +79,7 @@ int tsdbEndFSTxn(STsdb *pRepo); ...@@ -79,7 +79,7 @@ int tsdbEndFSTxn(STsdb *pRepo);
int tsdbEndFSTxnWithError(STsdbFS *pfs); int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta); void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); // void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
void tsdbFSIterSeek(SFSIter *pIter, int fid); void tsdbFSIterSeek(SFSIter *pIter, int fid);
......
...@@ -22,11 +22,57 @@ ...@@ -22,11 +22,57 @@
extern "C" { extern "C" {
#endif #endif
typedef struct STsdbMemTable STsdbMemTable; typedef struct {
int rowsInserted;
int rowsUpdated;
int rowsDeleteSucceed;
int rowsDeleteFailed;
int nOperations;
TSKEY keyFirst;
TSKEY keyLast;
} SMergeInfo;
typedef struct STbData {
tb_uid_t uid;
TSKEY keyMin;
TSKEY keyMax;
int64_t nrows;
SSkipList *pData;
} STbData;
typedef struct STsdbMemTable {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyMin;
TSKEY keyMax;
uint64_t nRow;
SMemAllocator *pMA;
// Container
SSkipList *pSlIdx; // SSkiplist<STbData>
SHashObj * pHashIdx;
} STsdbMemTable;
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb); STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable); void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator *pIter) {
if (pIter == NULL) return NULL;
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) return NULL;
return (SMemRow)SL_GET_NODE_DATA(node);
}
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
SMemRow row = tsdbNextIterRow(pIter);
if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
return memRowKey(row);
}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#if 0
#include "tsdbint.h" #include "tsdbint.h"
typedef struct { typedef struct {
...@@ -528,3 +529,5 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { ...@@ -528,3 +529,5 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
return 0; return 0;
} }
#endif
...@@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { ...@@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
} }
// ================== STsdbFS // ================== STsdbFS
STsdbFS *tsdbNewFS(STsdbCfg *pCfg) { STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) {
int keep = pCfg->keep; int keep = pCfg->keep;
int days = pCfg->daysPerFile; int days = pCfg->daysPerFile;
int maxFSet = TSDB_MAX_FSETS(keep, days); int maxFSet = TSDB_MAX_FSETS(keep, days);
...@@ -1287,7 +1287,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) { ...@@ -1287,7 +1287,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
return -1; return -1;
} }
if (tsdbSaveFSStatus(pRepo->fs.cstatus, REPO_ID(pRepo)) < 0) { if (tsdbSaveFSStatus(pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -15,18 +15,19 @@ ...@@ -15,18 +15,19 @@
#include "tsdbDef.h" #include "tsdbDef.h"
static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta);
static void tsdbFree(STsdb *pTsdb); static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb); static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb); static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) { STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
// Set default TSDB Options // Set default TSDB Options
if (pTsdbCfg == NULL) { // if (pTsdbCfg == NULL) {
pTsdbCfg = &defautlTsdbOptions; pTsdbCfg = &defautlTsdbOptions;
} // }
// Validate the options // Validate the options
if (tsdbValidateOptions(pTsdbCfg) < 0) { if (tsdbValidateOptions(pTsdbCfg) < 0) {
...@@ -35,7 +36,7 @@ STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory ...@@ -35,7 +36,7 @@ STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory
} }
// Create the handle // Create the handle
pTsdb = tsdbNew(path, pTsdbCfg, pMAF); pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta);
if (pTsdb == NULL) { if (pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
...@@ -62,7 +63,8 @@ void tsdbClose(STsdb *pTsdb) { ...@@ -62,7 +63,8 @@ void tsdbClose(STsdb *pTsdb) {
void tsdbRemove(const char *path) { taosRemoveDir(path); } void tsdbRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) { static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
pTsdb = (STsdb *)calloc(1, sizeof(STsdb)); pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
...@@ -72,25 +74,32 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorF ...@@ -72,25 +74,32 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorF
} }
pTsdb->path = strdup(path); pTsdb->path = strdup(path);
pTsdb->vgId = vgId;
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg); tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
pTsdb->pmaf = pMAF; pTsdb->pmaf = pMAF;
pTsdb->pMeta = pMeta;
pTsdb->fs = tsdbNewFS(pTsdbCfg);
return pTsdb; return pTsdb;
} }
static void tsdbFree(STsdb *pTsdb) { static void tsdbFree(STsdb *pTsdb) {
if (pTsdb) { if (pTsdb) {
tsdbFreeFS(pTsdb->fs);
tfree(pTsdb->path); tfree(pTsdb->path);
free(pTsdb); free(pTsdb);
} }
} }
static int tsdbOpenImpl(STsdb *pTsdb) { static int tsdbOpenImpl(STsdb *pTsdb) {
tsdbOpenFS(pTsdb);
// TODO // TODO
return 0; return 0;
} }
static void tsdbCloseImpl(STsdb *pTsdb) { static void tsdbCloseImpl(STsdb *pTsdb) {
tsdbCloseFS(pTsdb);
// TODO // TODO
} }
#if 0 #if 0
...@@ -112,8 +121,8 @@ static void tsdbCloseImpl(STsdb *pTsdb) { ...@@ -112,8 +121,8 @@ static void tsdbCloseImpl(STsdb *pTsdb) {
// no test file errors here // no test file errors here
#include "taosdef.h" #include "taosdef.h"
#include "tsdbint.h" #include "tsdbint.h"
#include "ttimer.h"
#include "tthread.h" #include "tthread.h"
#include "ttimer.h"
#define IS_VALID_PRECISION(precision) \ #define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
......
...@@ -15,30 +15,6 @@ ...@@ -15,30 +15,6 @@
#include "tsdbDef.h" #include "tsdbDef.h"
typedef struct STbData {
tb_uid_t uid;
TSKEY keyMin;
TSKEY keyMax;
int64_t nrows;
SSkipList *pData;
} STbData;
struct STsdbMemTable {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyMin;
TSKEY keyMax;
uint64_t nRow;
SMemAllocator *pMA;
// Container
#if 1
SSkipList *pSlIdx; // SSkiplist<STbData>
SHashObj * pHashIdx;
#else
TD_SLIST(STbData) list;
#endif
};
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg); static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg);
static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows); static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows);
static STbData *tsdbNewTbData(tb_uid_t uid); static STbData *tsdbNewTbData(tb_uid_t uid);
...@@ -46,6 +22,7 @@ static void tsdbFreeTbData(STbData *pTbData); ...@@ -46,6 +22,7 @@ static void tsdbFreeTbData(STbData *pTbData);
static char * tsdbGetTsTupleKey(const void *data); static char * tsdbGetTsTupleKey(const void *data);
static int tsdbTbDataComp(const void *arg1, const void *arg2); static int tsdbTbDataComp(const void *arg1, const void *arg2);
static char * tsdbTbDataGetUid(const void *arg); static char * tsdbTbDataGetUid(const void *arg);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
STsdbMemTable *pMemTable = (STsdbMemTable *)calloc(1, sizeof(*pMemTable)); STsdbMemTable *pMemTable = (STsdbMemTable *)calloc(1, sizeof(*pMemTable));
...@@ -127,6 +104,129 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, ...@@ -127,6 +104,129 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg,
return 0; return 0;
} }
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function tries to procceed AS MUCH AS POSSIBLE.
*/
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
if (pIter == NULL) return 0;
STSchema * pSchema = NULL;
TSKEY rowKey = 0;
TSKEY fKey = 0;
bool isRowDel = false;
int filterIter = 0;
SMemRow row = NULL;
SMergeInfo mInfo;
if (pMergeInfo == NULL) pMergeInfo = &mInfo;
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
pMergeInfo->keyFirst = INT64_MAX;
pMergeInfo->keyLast = INT64_MIN;
if (pCols) tdResetDataCols(pCols);
row = tsdbNextIterRow(pIter);
if (row == NULL || memRowKey(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey = memRowKey(row);
isRowDel = memRowDeleted(row);
}
if (filterIter >= nFilterKeys) {
fKey = INT64_MAX;
} else {
fKey = tdGetKey(filterKeys[filterIter]);
}
while (true) {
if (fKey == INT64_MAX && rowKey == INT64_MAX) break;
if (fKey < rowKey) {
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
filterIter++;
if (filterIter >= nFilterKeys) {
fKey = INT64_MAX;
} else {
fKey = tdGetKey(filterKeys[filterIter]);
}
} else if (fKey > rowKey) {
if (isRowDel) {
pMergeInfo->rowsDeleteFailed++;
} else {
if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsInserted++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
}
tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter);
if (row == NULL || memRowKey(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey = memRowKey(row);
isRowDel = memRowDeleted(row);
}
} else {
if (isRowDel) {
ASSERT(!keepDup);
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsDeleteSucceed++;
pMergeInfo->nOperations++;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
} else {
if (keepDup) {
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
pMergeInfo->rowsUpdated++;
pMergeInfo->nOperations++;
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, rowKey);
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row);
} else {
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, fKey);
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, fKey);
}
}
tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter);
if (row == NULL || memRowKey(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey = memRowKey(row);
isRowDel = memRowDeleted(row);
}
filterIter++;
if (filterIter >= nFilterKeys) {
fKey = INT64_MAX;
} else {
fKey = tdGetKey(filterKeys[filterIter]);
}
}
}
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) { static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta; // STsdbMeta * pMeta = pTsdb->tsdbMeta;
...@@ -337,6 +437,21 @@ static char *tsdbTbDataGetUid(const void *arg) { ...@@ -337,6 +437,21 @@ static char *tsdbTbDataGetUid(const void *arg) {
STbData *pTbData = (STbData *)arg; STbData *pTbData = (STbData *)arg;
return (char *)(&(pTbData->uid)); return (char *)(&(pTbData->uid));
} }
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row));
if (*ppSchema == NULL) {
ASSERT(false);
return -1;
}
}
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true);
}
return 0;
}
/* ------------------------ REFACTORING ------------------------ */ /* ------------------------ REFACTORING ------------------------ */
#if 0 #if 0
...@@ -674,21 +789,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -674,21 +789,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
if (*ppSchema == NULL) {
ASSERT(false);
return -1;
}
}
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, 0);
}
return 0;
}
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) { TSKEY now) {
......
...@@ -15,7 +15,16 @@ ...@@ -15,7 +15,16 @@
#include "tsdbDef.h" #include "tsdbDef.h"
const STsdbCfg defautlTsdbOptions = {.lruCacheSize = 0}; const STsdbCfg defautlTsdbOptions = {.precision = 0,
.lruCacheSize = 0,
.daysPerFile = 10,
.minRowsPerFileBlock = 100,
.maxRowsPerFileBlock = 4096,
.keep = 3650,
.keep1 = 3650,
.keep2 = 3650,
.update = 0,
.compression = TWO_STAGE_COMP};
int tsdbOptionsInit(STsdbCfg *pTsdbOptions) { int tsdbOptionsInit(STsdbCfg *pTsdbOptions) {
// TODO // TODO
......
...@@ -25,7 +25,6 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int3 ...@@ -25,7 +25,6 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int3
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds); int numOfColIds);
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
static STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { return NULL; }
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) { int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
ASSERT(pReadh != NULL && pRepo != NULL); ASSERT(pReadh != NULL && pRepo != NULL);
...@@ -667,4 +666,3 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc ...@@ -667,4 +666,3 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
return 0; return 0;
} }
aux_source_directory(src TSDB_SRC)
add_library(tsdb STATIC ${TSDB_SRC})
target_include_directories(
tsdb
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb2"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(tsdb os util common tfs)
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// #ifndef _TD_TSDB_BUFFER_H_
// #define _TD_TSDB_BUFFER_H_
// typedef struct {
// int64_t blockId;
// int offset;
// int remain;
// char data[];
// } STsdbBufBlock;
// typedef struct {
// pthread_cond_t poolNotEmpty;
// int bufBlockSize;
// int tBufBlocks;
// int nBufBlocks;
// int nRecycleBlocks;
// int nElasticBlocks;
// int64_t index;
// SList* bufBlockList;
// } STsdbBufPool;
// #define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
// STsdbBufPool* tsdbNewBufPool();
// void tsdbFreeBufPool(STsdbBufPool* pBufPool);
// int tsdbOpenBufPool(STsdb* pRepo);
// void tsdbCloseBufPool(STsdb* pRepo);
// SListNode* tsdbAllocBufBlockFromPool(STsdb* pRepo);
// int tsdbExpandPool(STsdb* pRepo, int32_t oldTotalBlocks);
// void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
// // health cite
// STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
// void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
// #endif /* _TD_TSDB_BUFFER_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_COMMIT_H_
#define _TD_TSDB_COMMIT_H_
typedef struct {
int minFid;
int midFid;
int maxFid;
TSKEY minKey;
} SRtn;
typedef struct {
uint64_t uid;
int64_t offset;
int64_t size;
} SKVRecord;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdb *pRepo);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf);
int tsdbApplyRtn(STsdb *pRepo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
#endif /* _TD_TSDB_COMMIT_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_FS_H_
#define _TD_TSDB_FS_H_
/**
* 1. The fileset .head/.data/.last use the same fver 0 before 2021.10.10.
* 2. .head fver is 1 when extract aggregate block data from .data/.last file and save to separate .smad/.smal file
* since 2021.10.10
* // TODO update date and add release version.
*/
typedef enum {
TSDB_FS_VER_0 = 0,
TSDB_FS_VER_1,
} ETsdbFsVer;
#define TSDB_FVER_TYPE uint32_t
#define TSDB_LATEST_FVER TSDB_FS_VER_1 // latest version for DFile
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_1 // latest version for 'current' file
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
switch (fType) {
case TSDB_FILE_HEAD:
return TSDB_FS_VER_1;
default:
return TSDB_FS_VER_0;
}
}
// ================== TSDB global config
extern bool tsdbForceKeepFile;
// ================== CURRENT file header info
typedef struct {
uint32_t version; // Current file system version (relating to code)
uint32_t len; // Encode content length (including checksum)
} SFSHeader;
// ================== TSDB File System Meta
typedef struct {
uint32_t version; // Commit version from 0 to increase
int64_t totalPoints; // total points
int64_t totalStorage; // Uncompressed total storage
} STsdbFSMeta;
// ==================
typedef struct {
STsdbFSMeta meta; // FS meta
SMFile* pmf; // meta file pointer
SMFile mf; // meta file
SArray* df; // data file array
} SFSStatus;
typedef struct {
pthread_rwlock_t lock;
SFSStatus* cstatus; // current status
SHashObj* metaCache; // meta cache
SHashObj* metaCacheComp; // meta cache for compact
bool intxn;
SFSStatus* nstatus; // new status
} STsdbFS;
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
#define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
#define FS_IN_TXN(pfs) (pfs)->intxn
#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
typedef struct {
int direction;
uint64_t version; // current FS version
STsdbFS* pfs;
int index; // used to position next fset when version the same
int fid; // used to seek when version is changed
SDFileSet* pSet;
} SFSIter;
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS *tsdbNewFS(STsdbCfg *pCfg);
void * tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdb *pRepo);
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdb *pRepo);
int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
void tsdbFSIterSeek(SFSIter *pIter, int fid);
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int tsdbWLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_wrlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_unlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
#endif /* _TD_TSDB_FS_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TS_TSDB_FILE_H_
#define _TS_TSDB_FILE_H_
#include "os.h"
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TSDB_IVLD_FID INT_MIN
#define TSDB_FILE_STATE_OK 0
#define TSDB_FILE_STATE_BAD 1
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f))
#define TSDB_FILE_FD(tf) ((tf)->fd)
#define TSDB_FILE_FULL_NAME(tf) TFILE_NAME(TSDB_FILE_F(tf))
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_FD(tf) >= 0)
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_FD(tf))
#define TSDB_FILE_STATE(tf) ((tf)->state)
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
#define ASSERT_TSDB_FSET_NFILES_VALID(s) \
do { \
uint8_t nDFiles = tsdbGetNFiles(s); \
ASSERT((nDFiles >= TSDB_FILE_MIN) && (nDFiles <= TSDB_FILE_MAX)); \
} while (0)
typedef enum {
TSDB_FILE_HEAD = 0,
TSDB_FILE_DATA,
TSDB_FILE_LAST,
TSDB_FILE_SMAD, // sma for .data
TSDB_FILE_SMAL, // sma for .last
TSDB_FILE_MAX,
TSDB_FILE_META
} TSDB_FILE_T;
#define TSDB_FILE_MIN 3U // min valid number of files in one DFileSet(.head/.data/.last)
// =============== SMFile
typedef struct {
int64_t size;
int64_t tombSize;
int64_t nRecords;
int64_t nDels;
uint32_t magic;
} SMFInfo;
typedef struct {
SMFInfo info;
TFILE f;
int fd;
uint8_t state;
} SMFile;
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
void tsdbInitMFileEx(SMFile* pMFile, const SMFile* pOMFile);
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
int tsdbEncodeSMFileEx(void** buf, SMFile* pMFile);
void* tsdbDecodeSMFileEx(void* buf, SMFile* pMFile);
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
int tsdbCreateMFile(SMFile* pMFile, bool updateHeader);
int tsdbUpdateMFileHeader(SMFile* pMFile);
int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo);
int tsdbScanAndTryFixMFile(STsdb* pRepo);
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; }
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
ASSERT(TSDB_FILE_CLOSED(pMFile));
pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), flags | O_BINARY);
if (pMFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static FORCE_INLINE void tsdbCloseMFile(SMFile* pMFile) {
if (TSDB_FILE_OPENED(pMFile)) {
close(pMFile->fd);
TSDB_FILE_SET_CLOSED(pMFile);
}
}
static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pMFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t nwrite = taosWriteFile(pMFile->fd, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
static FORCE_INLINE void tsdbUpdateMFileMagic(SMFile* pMFile, void* pCksum) {
pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t*)(pCksum), sizeof(TSCKSUM));
}
static FORCE_INLINE int tsdbAppendMFile(SMFile* pMFile, void* buf, int64_t nbyte, int64_t* offset) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t toffset;
if ((toffset = tsdbSeekMFile(pMFile, 0, SEEK_END)) < 0) {
return -1;
}
ASSERT(pMFile->info.size == toffset);
if (offset) {
*offset = toffset;
}
if (tsdbWriteMFile(pMFile, buf, nbyte) < 0) {
return -1;
}
pMFile->info.size += nbyte;
return (int)nbyte;
}
static FORCE_INLINE int tsdbRemoveMFile(SMFile* pMFile) { return tfsremove(TSDB_FILE_F(pMFile)); }
static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t nread = taosReadFile(pMFile->fd, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
// =============== SDFile
typedef struct {
uint32_t magic;
uint32_t len;
uint32_t totalBlocks;
uint32_t totalSubBlocks;
uint32_t offset;
uint64_t size;
uint64_t tombSize;
uint32_t fver;
} SDFInfo;
typedef struct {
SDFInfo info;
TFILE f;
int fd;
uint8_t state;
} SDFile;
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile, uint32_t sfver);
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader, TSDB_FILE_T ftype);
int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; }
static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pDFile));
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), flags | O_BINARY);
if (pDFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) {
if (TSDB_FILE_OPENED(pDFile)) {
close(pDFile->fd);
TSDB_FILE_SET_CLOSED(pDFile);
}
}
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pDFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite = taosWriteFile(pDFile->fd, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) {
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM));
}
static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t toffset;
if ((toffset = tsdbSeekDFile(pDFile, 0, SEEK_END)) < 0) {
return -1;
}
ASSERT(pDFile->info.size == toffset);
if (offset) {
*offset = toffset;
}
if (tsdbWriteDFile(pDFile, buf, nbyte) < 0) {
return -1;
}
pDFile->info.size += nbyte;
return (int)nbyte;
}
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_FILE_F(pDFile)); }
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosReadFile(pDFile->fd, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
if (tfscopy(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
tsdbSetDFileInfo(pDest, TSDB_FILE_INFO(pSrc));
return 0;
}
// =============== SDFileSet
typedef struct {
int fid;
int state;
uint16_t ver; // fset version
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
typedef enum {
TSDB_FSET_VER_0 = 0, // .head/.data/.last
TSDB_FSET_VER_1, // .head/.data/.last/.smad/.smal
} ETsdbFSetVer;
#define TSDB_LATEST_FSET_VER TSDB_FSET_VER_1
// get nDFiles in SDFileSet
static FORCE_INLINE uint8_t tsdbGetNFiles(SDFileSet* pSet) {
switch (pSet->ver) {
case TSDB_FSET_VER_0:
return TSDB_FILE_MIN;
case TSDB_FSET_VER_1:
default:
return TSDB_FILE_MAX;
}
}
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_SET_CLOSED(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
#define TSDB_FSET_FSYNC(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < tsdbGetNFiles(s); ftype++) { \
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver, uint16_t fsetVer);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, uint32_t sfver);
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
}
}
static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
tsdbCloseDFileSet(pSet);
return -1;
}
}
return 0;
}
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) {
ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
}
}
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
ASSERT_TSDB_FSET_NFILES_VALID(pSrc);
for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSrc); ftype++) {
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
tsdbRemoveDFileSet(pDest);
return -1;
}
}
return 0;
}
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) {
*minKey = fid * days * tsTickPerDay[precision];
*maxKey = *minKey + days * tsTickPerDay[precision] - 1;
}
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
return false;
}
}
return true;
}
#endif /* _TS_TSDB_FILE_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_LOG_H_
#define _TD_TSDB_LOG_H_
extern int32_t tsdbDebugFlag;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#endif /* _TD_TSDB_LOG_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_MEMTABLE_H_
#define _TD_TSDB_MEMTABLE_H_
typedef struct {
int rowsInserted;
int rowsUpdated;
int rowsDeleteSucceed;
int rowsDeleteFailed;
int nOperations;
TSKEY keyFirst;
TSKEY keyLast;
} SMergeInfo;
typedef struct {
STable * pTable;
SSkipListIterator *pIter;
} SCommitIter;
struct STableData {
uint64_t uid;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
SSkipList* pData;
T_REF_DECLARE()
};
enum { TSDB_UPDATE_META, TSDB_DROP_META };
#ifdef WINDOWS
#pragma pack(push ,1)
typedef struct {
#else
typedef struct __attribute__((packed)){
#endif
char act;
uint64_t uid;
} SActObj;
#ifdef WINDOWS
#pragma pack(pop)
#endif
typedef struct {
int len;
char cont[];
} SActCont;
int tsdbRefMemTable(STsdb* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdb* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdb* pRepo, SMemSnapshot* pSnapshot, SArray* pATable);
void tsdbUnTakeMemSnapShot(STsdb* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdb* pRepo, int bytes);
int tsdbAsyncCommit(STsdb* pRepo);
int tsdbSyncCommitConfig(STsdb* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdb* pRepo);
static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL;
SSkipListNode* node = tSkipListIterGet(pIter);
if (node == NULL) return NULL;
return (SMemRow)SL_GET_NODE_DATA(node);
}
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
SMemRow row = tsdbNextIterRow(pIter);
if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
return memRowKey(row);
}
static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) {
SMemRow row = tsdbNextIterRow(pIter);
if (row == NULL) return TKEY_NULL;
return memRowTKey(row);
}
#endif /* _TD_TSDB_MEMTABLE_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_MEMORY_H_
#define _TD_TSDB_MEMORY_H_
static void * taosTMalloc(size_t size);
static void * taosTCalloc(size_t nmemb, size_t size);
static void * taosTRealloc(void *ptr, size_t size);
static void * taosTZfree(void *ptr);
static size_t taosTSizeof(void *ptr);
static void taosTMemset(void *ptr, int c);
static FORCE_INLINE void *taosTMalloc(size_t size) {
if (size <= 0) return NULL;
void *ret = malloc(size + sizeof(size_t));
if (ret == NULL) return NULL;
*(size_t *)ret = size;
return (void *)((char *)ret + sizeof(size_t));
}
static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) {
size_t tsize = nmemb * size;
void * ret = taosTMalloc(tsize);
if (ret == NULL) return NULL;
taosTMemset(ret, 0);
return ret;
}
static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); }
static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) {
if (ptr == NULL) return taosTMalloc(size);
if (size <= taosTSizeof(ptr)) return ptr;
void * tptr = (void *)((char *)ptr - sizeof(size_t));
size_t tsize = size + sizeof(size_t);
void* tptr1 = realloc(tptr, tsize);
if (tptr1 == NULL) return NULL;
tptr = tptr1;
*(size_t *)tptr = size;
return (void *)((char *)tptr + sizeof(size_t));
}
static FORCE_INLINE void* taosTZfree(void* ptr) {
if (ptr) {
free((void*)((char*)ptr - sizeof(size_t)));
}
return NULL;
}
#endif /* _TD_TSDB_MEMORY_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_META_H_
#define _TD_TSDB_META_H_
#include "tskiplist.h"
#define TSDB_MAX_TABLE_SCHEMAS 16
#pragma pack(push, 1)
typedef struct jsonMapValue {
void* table; // STable *
int16_t colId; // the json col ID.
} JsonMapValue;
#pragma pack(pop)
typedef struct STable {
STableId tableId;
ETableType type;
tstr* name; // NOTE: there a flexible string here
uint64_t suid;
struct STable* pSuper; // super table pointer
SArray* schema;
STSchema* tagSchema;
SKVRow tagVal;
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
SHashObj* jsonKeyMap; // For json tag key {"key":[t1, t2, t3]}
void* eventHandler; // TODO
void* streamHandler; // TODO
TSKEY lastKey;
SMemRow lastRow;
char* sql;
void* cqhandle;
SRWLatch latch; // TODO: implementa latch functions
SDataCol* lastCols;
int16_t maxColNum;
int16_t restoreColumnNum;
bool hasRestoreLastColumn;
int lastColSVersion;
int16_t cacheLastConfigVersion;
T_REF_DECLARE()
} STable;
typedef struct {
pthread_rwlock_t rwLock;
int32_t nTables;
int32_t maxTables;
STable** tables;
SList* superList;
SHashObj* uidMap;
int maxRowBytes;
int maxCols;
} STsdbMeta;
#define TSDB_INIT_NTABLES 1024
#define TABLE_TYPE(t) (t)->type
#define TABLE_NAME(t) (t)->name
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
#define TABLE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
// #define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta);
int tsdbOpenMeta(STsdb* pRepo);
int tsdbCloseMeta(STsdb* pRepo);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version, int8_t rowType);
int tsdbWLockRepoMeta(STsdb* pRepo);
int tsdbRLockRepoMeta(STsdb* pRepo);
int tsdbUnlockRepoMeta(STsdb* pRepo);
void tsdbRefTable(STable* pTable);
void tsdbUnRefTable(STable* pTable);
void tsdbUpdateTableSchema(STsdb* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
int tsdbRestoreTable(STsdb* pRepo, void* cont, int contLen);
void tsdbOrgMeta(STsdb* pRepo);
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
int tsdbUpdateLastColSchema(STable* pTable, STSchema* pNewSchema);
STSchema* tsdbGetTableLatestSchema(STable* pTable);
void tsdbFreeLastColumns(STable* pTable);
int tsdbCompareJsonMapValue(const void* a, const void* b);
void* tsdbGetJsonTagValue(STable* pTable, char* key, int32_t keyLen, int16_t* colId);
static FORCE_INLINE int tsdbCompareSchemaVersion(const void* key1, const void* key2) {
if (*(int16_t*)key1 < schemaVersion(*(STSchema**)key2)) {
return -1;
} else if (*(int16_t*)key1 > schemaVersion(*(STSchema**)key2)) {
return 1;
} else {
return 0;
}
}
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version,
int8_t rowType) {
STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
if (lock) TSDB_RLOCK_TABLE(pDTable);
if (_version < 0) { // get the latest version of schema
pTSchema = *(STSchema**)taosArrayGetLast(pDTable->schema);
} else { // get the schema with version
void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) {
if (rowType == SMEM_ROW_KV) {
ptr = taosArrayGetLast(pDTable->schema);
} else {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit;
}
}
pTSchema = *(STSchema**)ptr;
}
ASSERT(pTSchema != NULL);
if (copy) {
if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
} else {
pSchema = pTSchema;
}
_exit:
if (lock) TSDB_RUNLOCK_TABLE(pDTable);
return pSchema;
}
static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
return tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
}
static FORCE_INLINE STSchema* tsdbGetTableTagSchema(STable* pTable) {
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
STable* pSuper = pTable->pSuper;
if (pSuper == NULL) return NULL;
return pSuper->tagSchema;
} else if (pTable->type == TSDB_SUPER_TABLE) {
return pTable->tagSchema;
} else {
return NULL;
}
}
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == memRowKey(pTable->lastRow)));
return pTable->lastKey;
}
#endif /* _TD_TSDB_META_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_READ_IMPL_H_
#define _TD_TSDB_READ_IMPL_H_
#include "os.h"
#include "tfs.h"
#include "tsdb.h"
#include "tsdbFile.h"
#include "tsdbMemory.h"
#include "tsdbMeta.h"
#include "tskiplist.h"
typedef struct SReadH SReadH;
typedef struct {
int32_t tid;
uint32_t len;
uint32_t offset;
uint32_t hasLast : 2;
uint32_t numOfBlocks : 30;
uint64_t uid;
TSKEY maxKey;
} SBlockIdx;
#if 0
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8;
int32_t numOfRows : 24;
int32_t len;
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
int16_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
TSKEY keyFirst;
TSKEY keyLast;
} SBlock;
#endif
/**
* keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
* numOfCols; // not including timestamp column
*/
#define SBlockFieldsP0 \
int64_t last : 1; \
int64_t offset : 63; \
int32_t algorithm : 8; \
int32_t numOfRows : 24; \
int32_t len; \
int32_t keyLen; \
int16_t numOfSubBlocks; \
int16_t numOfCols; \
TSKEY keyFirst; \
TSKEY keyLast
/**
* aggrStat; // only valid when blkVer > 0. 0 - no aggr part in .data/.last/.smad/.smal, 1 - has aggr in .smad/.smal
* blkVer; // 0 - original block, 1 - block since importing .smad/.smal
* aggrOffset; // only valid when blkVer > 0 and aggrStat > 0
*/
#define SBlockFieldsP1 \
uint64_t aggrStat : 1; \
uint64_t blkVer : 7; \
uint64_t aggrOffset : 56
typedef struct {
SBlockFieldsP0;
} SBlockV0;
typedef struct {
SBlockFieldsP0;
SBlockFieldsP1;
} SBlockV1;
typedef enum {
TSDB_SBLK_VER_0 = 0,
TSDB_SBLK_VER_1,
} ESBlockVer;
#define SBlockVerLatest TSDB_SBLK_VER_1
#define SBlock SBlockV1 // latest SBlock definition
// lastest SBlockInfo definition
typedef struct {
int32_t delimiter; // For recovery usage
int32_t tid;
uint64_t uid;
SBlock blocks[];
} SBlockInfo;
typedef struct {
int16_t colId;
int32_t len;
uint32_t type : 8;
uint32_t offset : 24;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
uint8_t offsetH;
char padding[1];
} SBlockColV0;
typedef struct {
int16_t colId;
uint8_t offsetH;
uint8_t reserved; // reserved field, not used
int32_t len;
uint32_t type : 8;
uint32_t offset : 24;
} SBlockColV1;
#define SBlockCol SBlockColV1 // latest SBlockCol definition
typedef struct {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SAggrBlkColV1;
#define SAggrBlkCol SAggrBlkColV1 // latest SAggrBlkCol definition
// Code here just for back-ward compatibility
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1);
pBlockCol->offsetH = (uint8_t)(offset >> 24);
}
static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
uint32_t offset1 = pBlockCol->offset;
uint32_t offset2 = pBlockCol->offsetH;
return (offset1 | (offset2 << 24));
}
typedef struct {
int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage
uint64_t uid; // For recovery usage
SBlockCol cols[];
} SBlockData;
typedef void SAggrBlkData; // SBlockCol cols[];
struct SReadH {
STsdb * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo * pBlkInfo; // SBlockInfoV#
SBlockData * pBlkData; // Block info
SAggrBlkData *pAggrBlkData; // Aggregate Block info
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
void * pExBuf; // extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
case TSDB_SBLK_VER_1:
default:
return TSDB_BLOCK_STATIS_SIZE(nCols, 1);
}
}
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
ASSERT(false);
return 0;
case TSDB_SBLK_VER_1:
default:
return TSDB_BLOCK_AGGR_SIZE(nCols, 1);
}
}
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
void tsdbDestroyReadH(SReadH *pReadh);
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
void tsdbCloseAndUnsetFSet(SReadH *pReadh);
int tsdbLoadBlockIdx(SReadH *pReadh);
int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
int tsdbLoadBlockInfo(SReadH *pReadh, void **pTarget, uint32_t *extendedLen);
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds);
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
void * pBuf = *ppBuf;
size_t tsize = taosTSizeof(pBuf);
if (tsize < size) {
if (tsize == 0) tsize = 1024;
while (tsize < size) {
tsize *= 2;
}
*ppBuf = taosTRealloc(pBuf, tsize);
if (*ppBuf == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
return 0;
}
static FORCE_INLINE SBlockCol *tsdbGetSBlockCol(SBlock *pBlock, SBlockCol **pDestBlkCol, SBlockCol *pBlkCols,
int colIdx) {
if (pBlock->blkVer == SBlockVerLatest) {
*pDestBlkCol = pBlkCols + colIdx;
return *pDestBlkCol;
}
if (pBlock->blkVer == TSDB_SBLK_VER_0) {
SBlockColV0 *pBlkCol = (SBlockColV0 *)pBlkCols + colIdx;
(*pDestBlkCol)->colId = pBlkCol->colId;
(*pDestBlkCol)->len = pBlkCol->len;
(*pDestBlkCol)->type = pBlkCol->type;
(*pDestBlkCol)->offset = pBlkCol->offset;
(*pDestBlkCol)->offsetH = pBlkCol->offsetH;
}
return *pDestBlkCol;
}
#endif /*_TD_TSDB_READ_IMPL_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_INT_H_
#define _TD_TSDB_INT_H_
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tarray.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tcompression.h"
#include "tdataformat.h"
#include "tfs.h"
#include "thash.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tsdbMemory.h"
#include "tskiplist.h"
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
#endif
// Log
#include "tsdbLog.h"
// Meta
#include "tsdbMeta.h"
// // Buffer
// #include "tsdbBuffer.h"
// MemTable
#include "tsdbMemTable.h"
// File
#include "tsdbFile.h"
// FS
#include "tsdbFS.h"
// ReadImpl
#include "tsdbReadImpl.h"
// Commit
#include "tsdbCommit.h"
// Compact
#include "tsdbCompact.h"
#include "tsdbRowMergeBuf.h"
// Main definitions
struct STsdb {
uint8_t state;
STsdbCfg config;
STsdbStat stat;
STsdbMeta* tsdbMeta;
SMemTable* mem;
SMemTable* imem;
STsdbFS* fs;
SRtn rtn;
SMergeBuf mergeBuf; // used when update=2
};
#define REPO_ID(r) (r)->config.tsdbId
#define REPO_CFG(r) (&((r)->config))
#define REPO_FS(r) ((r)->fs)
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
int tsdbLockRepo(STsdb* pRepo);
int tsdbUnlockRepo(STsdb* pRepo);
STsdbMeta* tsdbGetMeta(STsdb* pRepo);
int tsdbCheckCommit(STsdb* pRepo);
int tsdbRestoreInfo(STsdb* pRepo);
UNUSED_FUNC int tsdbCacheLastData(STsdb* pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdb* pRepo, STable* pTable);
void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]);
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_INT_H_ */
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
add_executable(tsdbTests ${SOURCE_LIST})
target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb tutil trpc)
add_test(NAME unit COMMAND ${CMAKE_CURRENT_BINARY_DIR}/tsdbTests)
\ No newline at end of file
此差异已折叠。
...@@ -112,6 +112,9 @@ public: ...@@ -112,6 +112,9 @@ public:
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
// todo // todo
vgInfo->vgId = 1; vgInfo->vgId = 1;
vgInfo->numOfEps = 1;
vgInfo->epAddr[0].port = 6030;
strcpy(vgInfo->epAddr[0].fqdn, "node1");
return 0; return 0;
} }
......
...@@ -234,7 +234,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { ...@@ -234,7 +234,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) { static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
execNode->nodeId = vg->vgId; execNode->nodeId = vg->vgId;
execNode->inUse = 0; // todo execNode->inUse = vg->inUse;
execNode->numOfEps = vg->numOfEps; execNode->numOfEps = vg->numOfEps;
for (int8_t i = 0; i < vg->numOfEps; ++i) { for (int8_t i = 0; i < vg->numOfEps; ++i) {
execNode->epAddr[i] = vg->epAddr[i]; execNode->epAddr[i] = vg->epAddr[i];
......
...@@ -62,7 +62,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f ...@@ -62,7 +62,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
return func(jObj, *obj); return func(jObj, *obj);
} }
static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* array) { static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array); size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
if (size > 0) { if (size > 0) {
cJSON* jArray = cJSON_AddArrayToObject(json, name); cJSON* jArray = cJSON_AddArrayToObject(json, name);
...@@ -70,7 +70,7 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* ...@@ -70,7 +70,7 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray*
return false; return false;
} }
for (size_t i = 0; i < size; ++i) { for (size_t i = 0; i < size; ++i) {
if (!addItem(jArray, func, taosArrayGetP(array, i))) { if (!addItem(jArray, func, isPoint ? taosArrayGetP(array, i) : taosArrayGet(array, i))) {
return false; return false;
} }
} }
...@@ -78,11 +78,19 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* ...@@ -78,11 +78,19 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray*
return true; return true;
} }
static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) { static bool addInlineArray(cJSON* json, const char* name, FToJson func, const SArray* array) {
return addTarray(json, name, func, array, false);
}
static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* array) {
return addTarray(json, name, func, array, true);
}
static bool fromTarray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize, bool isPoint) {
const cJSON* jArray = cJSON_GetObjectItem(json, name); const cJSON* jArray = cJSON_GetObjectItem(json, name);
int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray)); int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray));
if (size > 0) { if (size > 0) {
*array = taosArrayInit(size, POINTER_BYTES); *array = taosArrayInit(size, isPoint ? POINTER_BYTES : itemSize);
if (NULL == *array) { if (NULL == *array) {
return false; return false;
} }
...@@ -92,11 +100,19 @@ static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArra ...@@ -92,11 +100,19 @@ static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArra
if (NULL == item || !func(cJSON_GetArrayItem(jArray, i), item)) { if (NULL == item || !func(cJSON_GetArrayItem(jArray, i), item)) {
return false; return false;
} }
taosArrayPush(*array, &item); taosArrayPush(*array, isPoint ? &item : item);
} }
return true; return true;
} }
static bool fromInlineArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) {
return fromTarray(json, name, func, array, itemSize, false);
}
static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) {
return fromTarray(json, name, func, array, itemSize, true);
}
static bool addRawArray(cJSON* json, const char* name, FToJson func, const void* array, int32_t itemSize, int32_t size) { static bool addRawArray(cJSON* json, const char* name, FToJson func, const void* array, int32_t itemSize, int32_t size) {
if (size > 0) { if (size > 0) {
cJSON* jArray = cJSON_AddArrayToObject(json, name); cJSON* jArray = cJSON_AddArrayToObject(json, name);
...@@ -556,6 +572,32 @@ static bool epAddrFromJson(const cJSON* json, void* obj) { ...@@ -556,6 +572,32 @@ static bool epAddrFromJson(const cJSON* json, void* obj) {
return true; return true;
} }
static const char* jkNodeAddrId = "NodeId";
static const char* jkNodeAddrInUse = "InUse";
static const char* jkNodeAddrEpAddrs = "EpAddrs";
static bool nodeAddrToJson(const void* obj, cJSON* json) {
const SQueryNodeAddr* ep = (const SQueryNodeAddr*)obj;
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, ep->nodeId);
if (res) {
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse);
}
if (res) {
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddrMsg));
}
return res;
}
static bool nodeAddrFromJson(const cJSON* json, void* obj) {
SQueryNodeAddr* ep = (SQueryNodeAddr*)obj;
ep->nodeId = getNumber(json, jkNodeAddrId);
ep->inUse = getNumber(json, jkNodeAddrInUse);
int32_t numOfEps = 0;
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddrMsg), &numOfEps);
ep->numOfEps = numOfEps;
return res;
}
static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId"; static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId";
static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints"; static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints";
...@@ -563,7 +605,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { ...@@ -563,7 +605,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj; const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId); bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId);
if (res) { if (res) {
res = addArray(json, jkExchangeNodeSrcEndPoints, epAddrToJson, exchange->pSrcEndPoints); res = addInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints);
} }
return res; return res;
} }
...@@ -571,7 +613,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { ...@@ -571,7 +613,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
static bool exchangeNodeFromJson(const cJSON* json, void* obj) { static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
SExchangePhyNode* exchange = (SExchangePhyNode*)obj; SExchangePhyNode* exchange = (SExchangePhyNode*)obj;
exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId); exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId);
return fromArray(json, jkExchangeNodeSrcEndPoints, epAddrFromJson, &exchange->pSrcEndPoints, sizeof(SEpAddrMsg)); return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SQueryNodeAddr));
} }
static bool specificPhyNodeToJson(const void* obj, cJSON* json) { static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
...@@ -803,7 +845,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) { ...@@ -803,7 +845,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
if (res) { if (res) {
res = addObject(jSubplan, jkSubplanDataSink, dataSinkToJson, subplan->pDataSink); res = addObject(jSubplan, jkSubplanDataSink, dataSinkToJson, subplan->pDataSink);
} }
if (!res) { if (!res) {
cJSON_Delete(jSubplan); cJSON_Delete(jSubplan);
return NULL; return NULL;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册