提交 9241af22 编写于 作者: H Hongze Cheng

tsdb integration

上级 e818e4aa
......@@ -19,14 +19,15 @@
#include <stdbool.h>
#include <stdint.h>
#include "common.h"
#include "taosdef.h"
#include "tmsg.h"
#include "tarray.h"
#include "tdataformat.h"
#include "tname.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tmsg.h"
#include "tname.h"
#ifdef __cplusplus
extern "C" {
......@@ -39,7 +40,7 @@ extern "C" {
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved
#define TSDB_STATUS_COMMIT_NOBLOCK 3 // commit no block, need to be solved
// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
......@@ -62,7 +63,8 @@ typedef struct {
void *cqH;
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema,
int start);
void (*cqDropFunc)(void *handle);
} STsdbAppH;
......@@ -75,16 +77,17 @@ typedef struct {
int32_t keep; // day of data to keep
int32_t keep1;
int32_t keep2;
int32_t lruCacheSize;
int32_t minRowsPerFileBlock; // minimum rows per file block
int32_t maxRowsPerFileBlock; // maximum rows per file block
int8_t precision;
int8_t compression;
int8_t update;
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
} STsdbCfg;
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
// --------- TSDB REPOSITORY USAGE STATISTICS
......@@ -94,18 +97,18 @@ typedef struct {
int64_t pointsWritten; // total data points written
} STsdbStat;
typedef struct STsdbRepo STsdbRepo;
typedef struct STsdb STsdb;
STsdbCfg *tsdbGetCfg(const STsdbRepo *repo);
STsdbCfg *tsdbGetCfg(const STsdb *repo);
// --------- TSDB REPOSITORY DEFINITION
int32_t tsdbCreateRepo(int repoid);
int32_t tsdbDropRepo(int repoid);
STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdbRepo *repo);
int8_t tsdbGetCompactState(STsdbRepo *repo);
int32_t tsdbCreateRepo(int repoid);
int32_t tsdbDropRepo(int repoid);
STsdb * tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbCloseRepo(STsdb *repo, int toCommit);
int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdb *repo);
int8_t tsdbGetCompactState(STsdb *repo);
// --------- TSDB TABLE DEFINITION
typedef struct {
uint64_t uid; // the unique table ID
......@@ -131,17 +134,17 @@ void tsdbClearTableCfg(STableCfg *config);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type);
char *tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
#define TSDB_TABLEID(_table) ((STableId *)(_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg);
int tsdbDropTable(STsdbRepo *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg);
int tsdbCreateTable(STsdb *repo, STableCfg *pCfg);
int tsdbDropTable(STsdb *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(STsdb *repo, SUpdateTableTagValMsg *pMsg);
uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
// the TSDB repository info
typedef struct STsdbRepoInfo {
......@@ -151,7 +154,7 @@ typedef struct STsdbRepoInfo {
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add
} STsdbRepoInfo;
STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo);
STsdbRepoInfo *tsdbGetStatus(STsdb *pRepo);
// the meter information report structure
typedef struct {
......@@ -169,21 +172,21 @@ typedef struct {
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
int32_t tsdbInsertData(STsdb *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
// -- FOR QUERY TIME SERIES DATA
typedef void *TsdbQueryHandleT; // Use void to hide implementation details
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
......@@ -207,10 +210,10 @@ typedef struct {
} SMemTable;
typedef struct {
SMemTable* mem;
SMemTable* imem;
SMemTable *mem;
SMemTable *imem;
SMemTable mtable;
SMemTable* omem;
SMemTable *omem;
} SMemSnapshot;
typedef struct SMemRef {
......@@ -218,14 +221,6 @@ typedef struct SMemRef {
SMemSnapshot snapshot;
} SMemRef;
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
int32_t numOfCols;
int64_t uid;
int32_t tid;
} SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
......@@ -237,23 +232,23 @@ typedef struct {
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SArray * pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray *dataBlockInfos;
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray * dataBlockInfos;
} STableBlockDist;
/**
......@@ -266,7 +261,7 @@ typedef struct {
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
TsdbQueryHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
SMemRef *pRef);
/**
......@@ -279,14 +274,13 @@ TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
* @param tableInfo table list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
TsdbQueryHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
SMemRef *pRef);
TsdbQueryHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
SMemRef *pMemRef);
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef);
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle);
bool isTsdbCacheLastRow(TsdbQueryHandleT *pQueryHandle);
/**
* get the queried table object list
......@@ -303,21 +297,20 @@ SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
* @param qinfo
* @return
*/
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
uint64_t qId, SMemRef *pRef);
/**
* get num of rows in mem table
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT *pHandle);
/**
* move to next block if exists
* move to next block if exists
*
* @param pQueryHandle
* @return
......@@ -362,7 +355,7 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdL
* @param stableid. super table sid
* @param pTagCond. tag query condition
*/
int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
int32_t tsdbQuerySTableByTagCond(STsdb *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols);
/**
......@@ -379,7 +372,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
......@@ -388,7 +381,7 @@ int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STab
* @param pGroupInfo
* @return
*/
int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
......@@ -398,9 +391,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo *groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT *queryHandle, STableBlockDist *pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
......@@ -416,7 +409,7 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
int tsdbInitCommitQueue();
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(STsdbRepo *repo);
int tsdbSyncCommit(STsdb *repo);
void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle);
......@@ -426,19 +419,19 @@ int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
int tsdbCompact(STsdb *pRepo);
// For TSDB Health Monitor
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
bool tsdbNoProblem(STsdb *pRepo);
// unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
int tsdbCheckWal(STsdb *pRepo, uint32_t walSize);
// for json tag
void* getJsonTagValueElment(void* data, char* key, int32_t keyLen, char* out, int16_t bytes);
void getJsonTagValueAll(void* data, void* dst, int16_t bytes);
char* parseTagDatatoJson(void *p);
void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes);
void getJsonTagValueAll(void *data, void *dst, int16_t bytes);
char *parseTagDatatoJson(void *p);
#ifdef __cplusplus
}
......
......@@ -92,7 +92,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// }
break;
case TDMT_VND_SUBMIT:
if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) {
if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr, NULL) < 0) {
// TODO: handle error
}
break;
......
......@@ -38,10 +38,10 @@ typedef struct {
STsdbBufPool* tsdbNewBufPool();
void tsdbFreeBufPool(STsdbBufPool* pBufPool);
int tsdbOpenBufPool(STsdbRepo* pRepo);
void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
int tsdbOpenBufPool(STsdb* pRepo);
void tsdbCloseBufPool(STsdb* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdb* pRepo);
int tsdbExpandPool(STsdb* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
// health cite
......
......@@ -31,16 +31,16 @@ typedef struct {
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
void *tsdbCommitData(STsdb *pRepo);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf);
int tsdbApplyRtn(STsdbRepo *pRepo);
int tsdbApplyRtn(STsdb *pRepo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
......
......@@ -18,6 +18,6 @@
typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T;
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
int tsdbScheduleCommit(STsdb *pRepo, TSDB_REQ_T req);
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
\ No newline at end of file
......@@ -19,7 +19,7 @@
extern "C" {
#endif
void *tsdbCompactImpl(STsdbRepo *pRepo);
void *tsdbCompactImpl(STsdb *pRepo);
#ifdef __cplusplus
}
......
......@@ -94,10 +94,10 @@ typedef struct {
STsdbFS *tsdbNewFS(STsdbCfg *pCfg);
void * tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdbRepo *pRepo);
void tsdbCloseFS(STsdbRepo *pRepo);
void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdbRepo *pRepo);
int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdb *pRepo);
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdb *pRepo);
int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
......@@ -106,7 +106,7 @@ int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
void tsdbFSIterSeek(SFSIter *pIter, int fid);
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta);
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock));
......
......@@ -82,7 +82,7 @@ int tsdbApplyMFileChange(SMFile* from, SMFile* to);
int tsdbCreateMFile(SMFile* pMFile, bool updateHeader);
int tsdbUpdateMFileHeader(SMFile* pMFile);
int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo);
int tsdbScanAndTryFixMFile(STsdbRepo* pRepo);
int tsdbScanAndTryFixMFile(STsdb* pRepo);
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
......@@ -349,7 +349,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdbRepo* pRepo, SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
ASSERT_TSDB_FSET_NFILES_VALID(pSet);
......
......@@ -60,16 +60,16 @@ typedef struct {
char cont[];
} SActCont;
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pATable);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbSyncCommitConfig(STsdbRepo* pRepo);
int tsdbRefMemTable(STsdb* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdb* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdb* pRepo, SMemSnapshot* pSnapshot, SArray* pATable);
void tsdbUnTakeMemSnapShot(STsdb* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdb* pRepo, int bytes);
int tsdbAsyncCommit(STsdb* pRepo);
int tsdbSyncCommitConfig(STsdb* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo);
void* tsdbCommitData(STsdb* pRepo);
static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL;
......
......@@ -16,15 +16,17 @@
#ifndef _TD_TSDB_META_H_
#define _TD_TSDB_META_H_
#include "tskiplist.h"
#define TSDB_MAX_TABLE_SCHEMAS 16
#pragma pack (push,1)
typedef struct jsonMapValue{
void* table; // STable *
int16_t colId; // the json col ID.
}JsonMapValue;
#pragma pack(push, 1)
typedef struct jsonMapValue {
void* table; // STable *
int16_t colId; // the json col ID.
} JsonMapValue;
#pragma pack (pop)
#pragma pack(pop)
typedef struct STable {
STableId tableId;
......@@ -44,8 +46,7 @@ typedef struct STable {
char* sql;
void* cqhandle;
SRWLatch latch; // TODO: implementa latch functions
SDataCol *lastCols;
SDataCol* lastCols;
int16_t maxColNum;
int16_t restoreColumnNum;
bool hasRestoreLastColumn;
......@@ -81,44 +82,45 @@ typedef struct {
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta);
int tsdbOpenMeta(STsdbRepo* pRepo);
int tsdbCloseMeta(STsdbRepo* pRepo);
int tsdbOpenMeta(STsdb* pRepo);
int tsdbCloseMeta(STsdb* pRepo);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version, int8_t rowType);
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
int tsdbWLockRepoMeta(STsdb* pRepo);
int tsdbRLockRepoMeta(STsdb* pRepo);
int tsdbUnlockRepoMeta(STsdb* pRepo);
void tsdbRefTable(STable* pTable);
void tsdbUnRefTable(STable* pTable);
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen);
void tsdbOrgMeta(STsdbRepo* pRepo);
void tsdbUpdateTableSchema(STsdb* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
int tsdbRestoreTable(STsdb* pRepo, void* cont, int contLen);
void tsdbOrgMeta(STsdb* pRepo);
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
STSchema* tsdbGetTableLatestSchema(STable *pTable);
int tsdbUpdateLastColSchema(STable* pTable, STSchema* pNewSchema);
STSchema* tsdbGetTableLatestSchema(STable* pTable);
void tsdbFreeLastColumns(STable* pTable);
int tsdbCompareJsonMapValue(const void* a, const void* b);
void* tsdbGetJsonTagValue(STable* pTable, char* key, int32_t keyLen, int16_t* colId);
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
static FORCE_INLINE int tsdbCompareSchemaVersion(const void* key1, const void* key2) {
if (*(int16_t*)key1 < schemaVersion(*(STSchema**)key2)) {
return -1;
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
} else if (*(int16_t*)key1 > schemaVersion(*(STSchema**)key2)) {
return 1;
} else {
return 0;
}
}
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version, int8_t rowType) {
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version,
int8_t rowType) {
STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
if (lock) TSDB_RLOCK_TABLE(pDTable);
if (_version < 0) { // get the latest version of schema
pTSchema = *(STSchema **)taosArrayGetLast(pDTable->schema);
pTSchema = *(STSchema**)taosArrayGetLast(pDTable->schema);
} else { // get the schema with version
void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) {
......@@ -149,9 +151,9 @@ static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
return tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
}
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
static FORCE_INLINE STSchema* tsdbGetTableTagSchema(STable* pTable) {
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
STable *pSuper = pTable->pSuper;
STable* pSuper = pTable->pSuper;
if (pSuper == NULL) return NULL;
return pSuper->tagSchema;
} else if (pTable->type == TSDB_SUPER_TABLE) {
......
......@@ -16,12 +16,13 @@
#ifndef _TD_TSDB_READ_IMPL_H_
#define _TD_TSDB_READ_IMPL_H_
#include "os.h"
#include "tfs.h"
#include "tsdb.h"
#include "os.h"
#include "tsdbFile.h"
#include "tskiplist.h"
#include "tsdbMemory.h"
#include "tsdbMeta.h"
#include "tskiplist.h"
typedef struct SReadH SReadH;
......@@ -92,7 +93,7 @@ typedef enum {
#define SBlockVerLatest TSDB_SBLK_VER_1
#define SBlock SBlockV1 // latest SBlock definition
#define SBlock SBlockV1 // latest SBlock definition
// lastest SBlockInfo definition
typedef struct {
......@@ -126,7 +127,7 @@ typedef struct {
uint32_t offset : 24;
} SBlockColV1;
#define SBlockCol SBlockColV1 // latest SBlockCol definition
#define SBlockCol SBlockColV1 // latest SBlockCol definition
typedef struct {
int16_t colId;
......@@ -162,19 +163,19 @@ typedef struct {
typedef void SAggrBlkData; // SBlockCol cols[];
struct SReadH {
STsdbRepo * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo * pBlkInfo; // SBlockInfoV#
SBlockData *pBlkData; // Block info
STsdb * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo * pBlkInfo; // SBlockInfoV#
SBlockData * pBlkData; // Block info
SAggrBlkData *pAggrBlkData; // Aggregate Block info
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
void * pExBuf; // extra buffer
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
void * pExBuf; // extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
......@@ -216,7 +217,7 @@ static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
}
}
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo);
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
void tsdbDestroyReadH(SReadH *pReadh);
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
void tsdbCloseAndUnsetFSet(SReadH *pReadh);
......
......@@ -62,7 +62,7 @@ extern "C" {
#include "tsdbRowMergeBuf.h"
// Main definitions
struct STsdbRepo {
struct STsdb {
uint8_t state;
STsdbCfg config;
......@@ -97,17 +97,17 @@ struct STsdbRepo {
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo);
UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable);
int tsdbLockRepo(STsdb* pRepo);
int tsdbUnlockRepo(STsdb* pRepo);
STsdbMeta* tsdbGetMeta(STsdb* pRepo);
int tsdbCheckCommit(STsdb* pRepo);
int tsdbRestoreInfo(STsdb* pRepo);
UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdb *pRepo, STable* pTable);
void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]);
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdb* pRepo) {
ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL;
......
......@@ -58,7 +58,7 @@ void tsdbFreeBufPool(STsdbBufPool *pBufPool) {
}
}
int tsdbOpenBufPool(STsdbRepo *pRepo) {
int tsdbOpenBufPool(STsdb *pRepo) {
STsdbCfg * pCfg = &(pRepo->config);
STsdbBufPool *pPool = pRepo->pPool;
......@@ -93,7 +93,7 @@ _err:
return -1;
}
void tsdbCloseBufPool(STsdbRepo *pRepo) {
void tsdbCloseBufPool(STsdb *pRepo) {
if (pRepo == NULL) return;
STsdbBufPool * pBufPool = pRepo->pPool;
......@@ -111,7 +111,7 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) {
tsdbDebug("vgId:%d, buffer pool is closed", REPO_ID(pRepo));
}
SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
SListNode *tsdbAllocBufBlockFromPool(STsdb *pRepo) {
ASSERT(pRepo != NULL && pRepo->pPool != NULL);
ASSERT(IS_REPO_LOCKED(pRepo));
......@@ -165,7 +165,7 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
int tsdbExpandPool(STsdbRepo *pRepo, int32_t oldTotalBlocks) {
int tsdbExpandPool(STsdb *pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) {
return TSDB_CODE_SUCCESS;
}
......
......@@ -59,18 +59,18 @@ typedef struct {
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static int tsdbCommitMeta(STsdbRepo *pRepo);
static int tsdbCommitMeta(STsdb *pRepo);
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact);
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
static int tsdbCommitTSData(STsdbRepo *pRepo);
static void tsdbStartCommit(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
static int tsdbCompactMetaFile(STsdb *pRepo, STsdbFS *pfs, SMFile *pMFile);
static int tsdbCommitTSData(STsdb *pRepo);
static void tsdbStartCommit(STsdb *pRepo);
static void tsdbEndCommit(STsdb *pRepo, int eno);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo);
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static int tsdbGetFidLevel(int fid, SRtn *pRtn);
static int tsdbNextCommitFid(SCommitH *pCommith);
......@@ -92,7 +92,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update);
void *tsdbCommitData(STsdbRepo *pRepo) {
void *tsdbCommitData(STsdb *pRepo) {
if (pRepo->imem == NULL) {
return NULL;
}
......@@ -121,7 +121,7 @@ _err:
return NULL;
}
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet;
STsdbFS * pfs = REPO_FS(pRepo);
......@@ -266,7 +266,7 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
}
// =================== Commit Meta Data
static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile *pMf, bool open) {
static int tsdbInitCommitMetaFile(STsdb *pRepo, SMFile *pMf, bool open) {
STsdbFS *pfs = REPO_FS(pRepo);
SMFile * pOMFile = pfs->cstatus->pmf;
SDiskID did;
......@@ -295,7 +295,7 @@ static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile *pMf, bool open) {
return 0;
}
static int tsdbCommitMeta(STsdbRepo *pRepo) {
static int tsdbCommitMeta(STsdb *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
SMemTable *pMem = pRepo->imem;
SMFile * pOMFile = pfs->cstatus->pmf;
......@@ -387,7 +387,7 @@ void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) {
return buf;
}
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
......@@ -476,7 +476,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
return 0;
}
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
static int tsdbCompactMetaFile(STsdb *pRepo, STsdbFS *pfs, SMFile *pMFile) {
float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords);
float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size);
float compactRatio = (float)(tsTsdbMetaCompactRatio) / 100;
......@@ -602,7 +602,7 @@ _err:
}
// =================== Commit Time-Series Data
static int tsdbCommitTSData(STsdbRepo *pRepo) {
static int tsdbCommitTSData(STsdb *pRepo) {
SMemTable *pMem = pRepo->imem;
SCommitH commith;
SDFileSet *pSet = NULL;
......@@ -678,7 +678,7 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
return 0;
}
static void tsdbStartCommit(STsdbRepo *pRepo) {
static void tsdbStartCommit(STsdb *pRepo) {
SMemTable *pMem = pRepo->imem;
ASSERT(pMem->numOfRows > 0 || listNEles(pMem->actList) > 0);
......@@ -691,7 +691,7 @@ static void tsdbStartCommit(STsdbRepo *pRepo) {
pRepo->code = TSDB_CODE_SUCCESS;
}
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
static void tsdbEndCommit(STsdb *pRepo, int eno) {
if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else {
......@@ -721,7 +721,7 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
#endif
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
ASSERT(pSet == NULL || pSet->fid == fid);
......@@ -776,7 +776,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
static int tsdbCreateCommitIters(SCommitH *pCommith) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -839,7 +839,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
}
}
static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) {
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pCommith, 0, sizeof(*pCommith));
......@@ -902,7 +902,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
}
static int tsdbNextCommitFid(SCommitH *pCommith) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
int fid = TSDB_IVLD_FID;
......@@ -1057,7 +1057,7 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData * pBlockData;
......@@ -1252,7 +1252,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
}
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SMergeInfo mInfo;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
......@@ -1285,7 +1285,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
}
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
......@@ -1410,7 +1410,7 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlock block;
SDFile * pDFile;
......@@ -1527,7 +1527,7 @@ static void tsdbResetCommitTable(SCommitH *pCommith) {
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
SDiskID did;
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id));
......@@ -1732,7 +1732,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
}
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
......@@ -1749,7 +1749,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
return false;
}
int tsdbApplyRtn(STsdbRepo *pRepo) {
int tsdbApplyRtn(STsdb *pRepo) {
SRtn rtn;
SFSIter fsiter;
STsdbFS * pfs = REPO_FS(pRepo);
......
......@@ -27,7 +27,7 @@ typedef struct {
typedef struct {
TSDB_REQ_T req;
STsdbRepo *pRepo;
STsdb *pRepo;
} SReq;
static void *tsdbLoopCommit(void *arg);
......@@ -91,7 +91,7 @@ void tsdbDestroyCommitQueue() {
pthread_mutex_destroy(&(pQueue->lock));
}
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
int tsdbScheduleCommit(STsdb *pRepo, TSDB_REQ_T req) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq));
......@@ -114,7 +114,7 @@ int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
return 0;
}
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
static void tsdbApplyRepoConfig(STsdb *pRepo) {
pthread_mutex_lock(&pRepo->save_mutex);
pRepo->config_changed = false;
......@@ -157,7 +157,7 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL;
STsdbRepo * pRepo = NULL;
STsdb * pRepo = NULL;
TSDB_REQ_T req;
setThreadName("tsdbCommit");
......
......@@ -43,14 +43,14 @@ typedef struct {
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
#define TSDB_COMPACT_EXBUF(pComph) TSDB_READ_EXBUF(&((pComph)->readh))
static int tsdbAsyncCompact(STsdbRepo *pRepo);
static void tsdbStartCompact(STsdbRepo *pRepo);
static void tsdbEndCompact(STsdbRepo *pRepo, int eno);
static int tsdbCompactMeta(STsdbRepo *pRepo);
static int tsdbCompactTSData(STsdbRepo *pRepo);
static int tsdbAsyncCompact(STsdb *pRepo);
static void tsdbStartCompact(STsdb *pRepo);
static void tsdbEndCompact(STsdb *pRepo, int eno);
static int tsdbCompactMeta(STsdb *pRepo);
static int tsdbCompactTSData(STsdb *pRepo);
static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet);
static bool tsdbShouldCompact(SCompactH *pComph);
static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo);
static int tsdbInitCompactH(SCompactH *pComph, STsdb *pRepo);
static void tsdbDestroyCompactH(SCompactH *pComph);
static int tsdbInitCompTbArray(SCompactH *pComph);
static void tsdbDestroyCompTbArray(SCompactH *pComph);
......@@ -62,9 +62,9 @@ static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCo
void **ppCBuf, void **ppExBuf);
enum { TSDB_NO_COMPACT, TSDB_IN_COMPACT, TSDB_WAITING_COMPACT};
int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); }
int tsdbCompact(STsdb *pRepo) { return tsdbAsyncCompact(pRepo); }
void *tsdbCompactImpl(STsdbRepo *pRepo) {
void *tsdbCompactImpl(STsdb *pRepo) {
// Check if there are files in TSDB FS to compact
if (REPO_FS(pRepo)->cstatus->pmf == NULL) {
pRepo->compactState = TSDB_NO_COMPACT;
......@@ -94,7 +94,7 @@ _err:
return NULL;
}
static int tsdbAsyncCompact(STsdbRepo *pRepo) {
static int tsdbAsyncCompact(STsdb *pRepo) {
if (pRepo->compactState != TSDB_NO_COMPACT) {
tsdbInfo("vgId:%d not compact tsdb again ", REPO_ID(pRepo));
return 0;
......@@ -104,7 +104,7 @@ static int tsdbAsyncCompact(STsdbRepo *pRepo) {
return tsdbScheduleCommit(pRepo, COMPACT_REQ);
}
static void tsdbStartCompact(STsdbRepo *pRepo) {
static void tsdbStartCompact(STsdb *pRepo) {
assert(pRepo->compactState != TSDB_IN_COMPACT);
tsdbInfo("vgId:%d start to compact!", REPO_ID(pRepo));
tsdbStartFSTxn(pRepo, 0, 0);
......@@ -112,7 +112,7 @@ static void tsdbStartCompact(STsdbRepo *pRepo) {
pRepo->compactState = TSDB_IN_COMPACT;
}
static void tsdbEndCompact(STsdbRepo *pRepo, int eno) {
static void tsdbEndCompact(STsdb *pRepo, int eno) {
if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else {
......@@ -123,13 +123,13 @@ static void tsdbEndCompact(STsdbRepo *pRepo, int eno) {
tsem_post(&(pRepo->readyToCommit));
}
static int tsdbCompactMeta(STsdbRepo *pRepo) {
static int tsdbCompactMeta(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
tsdbUpdateMFile(pfs, pfs->cstatus->pmf);
return 0;
}
static int tsdbCompactTSData(STsdbRepo *pRepo) {
static int tsdbCompactTSData(STsdb *pRepo) {
SCompactH compactH;
SDFileSet *pSet = NULL;
......@@ -172,7 +172,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdb *pRepo = TSDB_COMPACT_REPO(pComph);
SDiskID did;
tsdbDebug("vgId:%d start to compact FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
......@@ -226,7 +226,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
// if (tsdbForceCompactFile) {
// return true;
// }
STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph);
STsdb * pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
STableCompactH *pTh;
......@@ -271,7 +271,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
(tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85));
}
static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo) {
static int tsdbInitCompactH(SCompactH *pComph, STsdb *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pComph, 0, sizeof(*pComph));
......@@ -324,7 +324,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
static int tsdbInitCompTbArray(SCompactH *pComph) { // Init pComp->tbArray
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdb *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
......@@ -421,7 +421,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
static void tsdbCompactFSetEnd(SCompactH *pComph) { tsdbCloseAndUnsetFSet(&(pComph->readh)); }
static int tsdbCompactFSetImpl(SCompactH *pComph) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdb *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
SBlockIdx blkIdx;
......@@ -508,7 +508,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf, void **ppExBuf) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdb *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile;
bool isLast;
......
......@@ -26,17 +26,17 @@ static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo);
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo);
static int tsdbScanRootDir(STsdbRepo *pRepo);
static int tsdbScanDataDir(STsdbRepo *pRepo);
static int tsdbOpenFSFromCurrent(STsdb *pRepo);
static int tsdbScanAndTryFixFS(STsdb *pRepo);
static int tsdbScanRootDir(STsdb *pRepo);
static int tsdbScanDataDir(STsdb *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
static int tsdbRestoreCurrent(STsdbRepo *pRepo);
static int tsdbRestoreCurrent(STsdb *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired);
static int tsdbProcessExpiredFS(STsdbRepo *pRepo);
static int tsdbCreateMeta(STsdbRepo *pRepo);
static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray);
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
static int tsdbProcessExpiredFS(STsdb *pRepo);
static int tsdbCreateMeta(STsdb *pRepo);
static int tsdbFetchTFileSet(STsdb *pRepo, SArray **fArray);
// For backward compatibility
// ================== CURRENT file header info
......@@ -159,7 +159,7 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) {
static SFSStatus *tsdbFreeFSStatus(SFSStatus *pStatus) {
if (pStatus) {
pStatus->df = taosArrayDestroy(&pStatus->df);
pStatus->df = taosArrayDestroy(pStatus->df);
free(pStatus);
}
......@@ -253,7 +253,7 @@ void *tsdbFreeFS(STsdbFS *pfs) {
return NULL;
}
static int tsdbProcessExpiredFS(STsdbRepo *pRepo) {
static int tsdbProcessExpiredFS(STsdb *pRepo) {
tsdbStartFSTxn(pRepo, 0, 0);
if (tsdbCreateMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno));
......@@ -272,7 +272,7 @@ static int tsdbProcessExpiredFS(STsdbRepo *pRepo) {
return 0;
}
static int tsdbCreateMeta(STsdbRepo *pRepo) {
static int tsdbCreateMeta(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
SMFile * pOMFile = pfs->cstatus->pmf;
SMFile mf;
......@@ -309,7 +309,7 @@ static int tsdbCreateMeta(STsdbRepo *pRepo) {
return 0;
}
int tsdbOpenFS(STsdbRepo *pRepo) {
int tsdbOpenFS(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
char current[TSDB_FILENAME_LEN] = "\0";
int nExpired = 0;
......@@ -351,12 +351,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) {
return 0;
}
void tsdbCloseFS(STsdbRepo *pRepo) {
void tsdbCloseFS(STsdb *pRepo) {
// Do nothing
}
// Start a new transaction to modify the file system
void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) {
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) {
STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(pfs->intxn == false);
......@@ -374,7 +374,7 @@ void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) {
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; }
int tsdbEndFSTxn(STsdbRepo *pRepo) {
int tsdbEndFSTxn(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(FS_IN_TXN(pfs));
SFSStatus *pStatus;
......@@ -655,7 +655,7 @@ static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]);
}
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
int fd = -1;
void * buffer = NULL;
......@@ -752,7 +752,7 @@ _err:
}
// Scan and try to fix incorrect files
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
static int tsdbScanAndTryFixFS(STsdb *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
......@@ -778,7 +778,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
return 0;
}
int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta) {
char tbuf[128];
STsdbFS * pfs = REPO_FS(pRepo);
SMFile mf;
......@@ -914,7 +914,7 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
return 0;
}
static int tsdbScanRootDir(STsdbRepo *pRepo) {
static int tsdbScanRootDir(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo);
......@@ -948,7 +948,7 @@ static int tsdbScanRootDir(STsdbRepo *pRepo) {
return 0;
}
static int tsdbScanDataDir(STsdbRepo *pRepo) {
static int tsdbScanDataDir(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo);
......@@ -992,7 +992,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
return false;
}
static int tsdbRestoreMeta(STsdbRepo *pRepo) {
static int tsdbRestoreMeta(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL;
......@@ -1113,7 +1113,7 @@ static int tsdbRestoreMeta(STsdbRepo *pRepo) {
return 0;
}
static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) {
static int tsdbFetchTFileSet(STsdb *pRepo, SArray **fArray) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL;
......@@ -1139,7 +1139,7 @@ static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) {
if (tdir == NULL) {
tsdbError("vgId:%d failed to fetch TFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
taosArrayDestroy(fArray);
taosArrayDestroy(*fArray);
regfree(&regex);
return -1;
}
......@@ -1152,7 +1152,7 @@ static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) {
if (taosArrayPush(*fArray, (void *)pf) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tfsClosedir(tdir);
taosArrayDestroy(fArray);
taosArrayDestroy(*fArray);
regfree(&regex);
return -1;
}
......@@ -1166,7 +1166,7 @@ static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) {
tsdbError("vgId:%d failed to fetch TFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
tfsClosedir(tdir);
taosArrayDestroy(fArray);
taosArrayDestroy(*fArray);
regfree(&regex);
return -1;
}
......@@ -1191,7 +1191,7 @@ static bool tsdbIsDFileSetValid(int nFiles) {
}
}
static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
static int tsdbRestoreDFileSet(STsdb *pRepo) {
const TFILE *pf = NULL;
SArray * fArray = NULL;
STsdbFS * pfs = REPO_FS(pRepo);
......@@ -1351,7 +1351,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
return 0;
}
static int tsdbRestoreCurrent(STsdbRepo *pRepo) {
static int tsdbRestoreCurrent(STsdb *pRepo) {
// Loop to recover mfile
if (tsdbRestoreMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
......@@ -1408,7 +1408,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
}
}
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) {
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) {
STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
SDFInfo info;
......
......@@ -187,7 +187,7 @@ int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) {
return 0;
}
int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) {
int tsdbScanAndTryFixMFile(STsdb *pRepo) {
SMFile * pMFile = pRepo->fs->cstatus->pmf;
struct stat mfstat;
SMFile mf;
......@@ -435,7 +435,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
return 0;
}
static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) {
struct stat dfstat;
SDFile df;
......@@ -545,7 +545,7 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
return -1;
}
if (taosFtruncate(TSDB_FILE_FD(&df), pDFile->info.size) < 0) {
if (taosFtruncateFile(TSDB_FILE_FD(&df), pDFile->info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseDFile(&df);
return -1;
......@@ -672,7 +672,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
return 0;
}
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet) {
ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
......
......@@ -25,12 +25,12 @@
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
static void tsdbFreeRepo(STsdbRepo *pRepo);
static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo);
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
static STsdb *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
static void tsdbFreeRepo(STsdb *pRepo);
static void tsdbStartStream(STsdb *pRepo);
static void tsdbStopStream(STsdb *pRepo);
static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh);
static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
// Function declaration
int32_t tsdbCreateRepo(int repoid) {
......@@ -63,8 +63,8 @@ int32_t tsdbDropRepo(int repoid) {
return tfsRmdir(tsdbDir);
}
STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdbRepo *pRepo;
STsdb *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdb *pRepo;
STsdbCfg config = *pCfg;
terrno = TSDB_CODE_SUCCESS;
......@@ -119,10 +119,10 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
}
// Note: all working thread and query thread must stopped when calling this function
int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
int tsdbCloseRepo(STsdb *repo, int toCommit) {
if (repo == NULL) return 0;
STsdbRepo *pRepo = repo;
STsdb *pRepo = repo;
int vgId = REPO_ID(pRepo);
terrno = TSDB_CODE_SUCCESS;
......@@ -157,12 +157,12 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
}
}
STsdbCfg *tsdbGetCfg(const STsdbRepo *repo) {
STsdbCfg *tsdbGetCfg(const STsdb *repo) {
ASSERT(repo != NULL);
return &((STsdbRepo *)repo)->config;
return &((STsdb *)repo)->config;
}
int tsdbLockRepo(STsdbRepo *pRepo) {
int tsdbLockRepo(STsdb *pRepo) {
int code = pthread_mutex_lock(&pRepo->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pRepo), strerror(errno));
......@@ -173,7 +173,7 @@ int tsdbLockRepo(STsdbRepo *pRepo) {
return 0;
}
int tsdbUnlockRepo(STsdbRepo *pRepo) {
int tsdbUnlockRepo(STsdb *pRepo) {
ASSERT(IS_REPO_LOCKED(pRepo));
pRepo->repoLocked = false;
int code = pthread_mutex_unlock(&pRepo->mutex);
......@@ -193,7 +193,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
// return 0;
// }
int tsdbCheckCommit(STsdbRepo *pRepo) {
int tsdbCheckCommit(STsdb *pRepo) {
ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config);
......@@ -207,23 +207,23 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
return 0;
}
STsdbMeta *tsdbGetMeta(STsdbRepo *pRepo) { return pRepo->tsdbMeta; }
STsdbMeta *tsdbGetMeta(STsdb *pRepo) { return pRepo->tsdbMeta; }
STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; }
STsdbRepoInfo *tsdbGetStatus(STsdb *pRepo) { return NULL; }
int tsdbGetState(STsdbRepo *repo) { return repo->state; }
int tsdbGetState(STsdb *repo) { return repo->state; }
int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); }
int8_t tsdbGetCompactState(STsdb *repo) { return (int8_t)(repo->compactState); }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL);
STsdbRepo *pRepo = repo;
STsdb *pRepo = repo;
*totalPoints = pRepo->stat.pointsWritten;
*totalStorage = pRepo->stat.totalStorage;
*compStorage = pRepo->stat.compStorage;
}
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
......@@ -343,7 +343,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
#endif
}
uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
// TODO
return 0;
#if 0
......@@ -564,8 +564,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
return 0;
}
static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(*pRepo));
static STsdb *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdb *pRepo = (STsdb *)calloc(1, sizeof(*pRepo));
if (pRepo == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
......@@ -629,7 +629,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return pRepo;
}
static void tsdbFreeRepo(STsdbRepo *pRepo) {
static void tsdbFreeRepo(STsdb *pRepo) {
if (pRepo) {
tsdbFreeFS(pRepo->fs);
tsdbFreeBufPool(pRepo->pPool);
......@@ -643,7 +643,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
}
}
static void tsdbStartStream(STsdbRepo *pRepo) {
static void tsdbStartStream(STsdb *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pMeta->maxTables; i++) {
......@@ -655,7 +655,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
}
}
static void tsdbStopStream(STsdbRepo *pRepo) {
static void tsdbStopStream(STsdb *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pMeta->maxTables; i++) {
......@@ -666,7 +666,7 @@ static void tsdbStopStream(STsdbRepo *pRepo) {
}
}
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) {
//tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data);
STSchema *pSchema = tsdbGetTableLatestSchema(pTable);
......@@ -811,7 +811,7 @@ out:
return err;
}
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
ASSERT(pTable->lastRow == NULL);
if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) {
return -1;
......@@ -856,7 +856,7 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
return 0;
}
int tsdbRestoreInfo(STsdbRepo *pRepo) {
int tsdbRestoreInfo(STsdb *pRepo) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
......@@ -930,7 +930,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
return 0;
}
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
......@@ -1021,7 +1021,7 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
return 0;
}
UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) {
bool cacheLastRow = false, cacheLastCol = false;
SFSIter fsiter;
SReadH readh;
......
......@@ -15,31 +15,31 @@
#include "tdataformat.h"
#include "tfunctional.h"
#include "tsdbRowMergeBuf.h"
#include "tsdbint.h"
#include "tskiplist.h"
#include "tsdbRowMergeBuf.h"
#include "ttime.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
static SMemTable * tsdbNewMemTable(STsdb *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
static int tsdbScanAndConvertSubmitMsg(STsdb *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbCheckTableSchema(STsdb *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbUpdateTableLatestInfo(STsdb *pRepo, STable *pTable, SMemRow row);
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now);
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
STsdbRepo * pRepo = repo;
int32_t tsdbInsertData(STsdb *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
STsdb * pRepo = repo;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
int32_t affectedrows = 0, numOfRows = 0;
......@@ -51,9 +51,9 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
return -1;
}
tsdbInitSubmitMsgIter(pMsg, &msgIter);
tInitSubmitMsgIter(pMsg, &msgIter);
while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) {
return -1;
......@@ -71,37 +71,37 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
}
// ---------------- INTERNAL FUNCTIONS ----------------
int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
int tsdbRefMemTable(STsdb *pRepo, SMemTable *pMemTable) {
if (pMemTable == NULL) return 0;
int ref = T_REF_INC(pMemTable);
tsdbDebug("vgId:%d ref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
tsdbDebug("vgId:%d ref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
return 0;
}
// Need to lock the repository
int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
int tsdbUnRefMemTable(STsdb *pRepo, SMemTable *pMemTable) {
if (pMemTable == NULL) return 0;
int ref = T_REF_DEC(pMemTable);
tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
int ref = T_REF_DEC(pMemTable);
tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
if (ref == 0) {
STsdbBufPool *pBufPool = pRepo->pPool;
SListNode *pNode = NULL;
bool addNew = false;
bool addNew = false;
if (tsdbLockRepo(pRepo) < 0) return -1;
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
if (pBufPool->nRecycleBlocks > 0) {
tsdbRecycleBufferBlock(pBufPool, pNode, false);
pBufPool->nRecycleBlocks -= 1;
} else {
if(pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 2) {
if (pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 2) {
tsdbRecycleBufferBlock(pBufPool, pNode, true);
} else {
tdListAppendNode(pBufPool->bufBlockList, pNode);
addNew = true;
}
}
}
}
if (addNew) {
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
......@@ -128,7 +128,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
return 0;
}
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot, SArray *pATable) {
int tsdbTakeMemSnapshot(STsdb *pRepo, SMemSnapshot *pSnapshot, SArray *pATable) {
memset(pSnapshot, 0, sizeof(*pSnapshot));
if (tsdbLockRepo(pRepo) < 0) return -1;
......@@ -180,7 +180,7 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot, SArray *pATab
return 0;
}
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot) {
void tsdbUnTakeMemSnapShot(STsdb *pRepo, SMemSnapshot *pSnapshot) {
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pSnapshot->omem, pSnapshot->imem);
if (pSnapshot->mem) {
......@@ -204,7 +204,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot) {
pSnapshot->omem = NULL;
}
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
void *tsdbAllocBytes(STsdb *pRepo, int bytes) {
STsdbCfg * pCfg = &pRepo->config;
STsdbBufBlock *pBufBlock = NULL;
void * ptr = NULL;
......@@ -266,7 +266,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
return ptr;
}
int tsdbSyncCommitConfig(STsdbRepo* pRepo) {
int tsdbSyncCommitConfig(STsdb *pRepo) {
ASSERT(pRepo->config_changed == true);
tsem_wait(&(pRepo->readyToCommit));
......@@ -290,7 +290,7 @@ int tsdbSyncCommitConfig(STsdbRepo* pRepo) {
return 0;
}
int tsdbAsyncCommit(STsdbRepo *pRepo) {
int tsdbAsyncCommit(STsdb *pRepo) {
tsem_wait(&(pRepo->readyToCommit));
ASSERT(pRepo->imem == NULL);
......@@ -313,8 +313,8 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
return 0;
}
int tsdbSyncCommit(STsdbRepo *repo) {
STsdbRepo *pRepo = repo;
int tsdbSyncCommit(STsdb *repo) {
STsdb *pRepo = repo;
tsdbAsyncCommit(pRepo);
tsem_wait(&(pRepo->readyToCommit));
......@@ -331,13 +331,13 @@ int tsdbSyncCommit(STsdbRepo *repo) {
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
*
* The function tries to procceed AS MUCH AS POSSIBLE.
*/
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
......@@ -453,7 +453,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
}
// ---------------- LOCAL FUNCTIONS ----------------
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
static SMemTable *tsdbNewMemTable(STsdb *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable));
......@@ -479,7 +479,7 @@ static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
goto _err;
}
pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock*));
pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock *));
if (pMemTable->bufBlockList == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......@@ -494,7 +494,7 @@ _err:
return NULL;
}
static void tsdbFreeMemTable(SMemTable* pMemTable) {
static void tsdbFreeMemTable(SMemTable *pMemTable) {
if (pMemTable) {
ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0));
ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0));
......@@ -520,7 +520,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData->numOfRows = 0;
uint8_t skipListCreateFlags;
if(pCfg->update == TD_ROW_DISCARD_UPDATE)
if (pCfg->update == TD_ROW_DISCARD_UPDATE)
skipListCreateFlags = SL_DISCARD_DUP_KEY;
else
skipListCreateFlags = SL_UPDATE_DUP_KEY;
......@@ -589,7 +589,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
return 0;
}
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = memRowKey(row);
if (rowKey < minKey || rowKey > maxKey) {
......@@ -604,7 +604,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMem
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
static int tsdbScanAndConvertSubmitMsg(STsdb *pRepo, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
STsdbMeta * pMeta = pRepo->tsdbMeta;
SSubmitMsgIter msgIter = {0};
......@@ -614,14 +614,14 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
TSKEY now = taosGetTimestamp(pRepo->config.precision);
TSKEY minKey = now - tsTickPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsTickPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
......@@ -661,8 +661,8 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
}
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
tInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) {
return -1;
}
......@@ -673,37 +673,34 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
return 0;
}
//row1 has higher priority
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo,
STSchema **ppSchema1, STSchema **ppSchema2,
STable* pTable, int32_t* pPoints, SMemRow* pLastRow) {
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
// row1 has higher priority
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdb *pRepo, STSchema **ppSchema1,
STSchema **ppSchema2, STable *pTable, int32_t *pPoints, SMemRow *pLastRow) {
// for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if (row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
(*pPoints)++;
return NULL;
}
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
"updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
memRowKey(row1));
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), "updated in",
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), memRowKey(row1));
if(row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) {
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1));
if(pMem == NULL) return NULL;
if (row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) {
void *pMem = tsdbAllocBytes(pRepo, memRowTLen(row1));
if (pMem == NULL) return NULL;
memRowCpy(pMem, row1);
(*pPoints)++;
*pLastRow = pMem;
return pMem;
}
STSchema *pSchema1 = *ppSchema1;
STSchema *pSchema2 = *ppSchema2;
SMergeBuf * pBuf = &pRepo->mergeBuf;
int dv1 = memRowVersion(row1);
int dv2 = memRowVersion(row2);
if(pSchema1 == NULL || schemaVersion(pSchema1) != dv1) {
if(pSchema2 != NULL && schemaVersion(pSchema2) == dv1) {
STSchema * pSchema1 = *ppSchema1;
STSchema * pSchema2 = *ppSchema2;
SMergeBuf *pBuf = &pRepo->mergeBuf;
int dv1 = memRowVersion(row1);
int dv2 = memRowVersion(row2);
if (pSchema1 == NULL || schemaVersion(pSchema1) != dv1) {
if (pSchema2 != NULL && schemaVersion(pSchema2) == dv1) {
*ppSchema1 = pSchema2;
} else {
*ppSchema1 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row1), (int8_t)memRowType(row1));
......@@ -711,8 +708,8 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
pSchema1 = *ppSchema1;
}
if(pSchema2 == NULL || schemaVersion(pSchema2) != dv2) {
if(schemaVersion(pSchema1) == dv2) {
if (pSchema2 == NULL || schemaVersion(pSchema2) != dv2) {
if (schemaVersion(pSchema1) == dv2) {
pSchema2 = pSchema1;
} else {
*ppSchema2 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row2), (int8_t)memRowType(row2));
......@@ -722,8 +719,8 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
SMemRow tmp = tsdbMergeTwoRows(pBuf, row1, row2, pSchema1, pSchema2);
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(tmp));
if(pMem == NULL) return NULL;
void *pMem = tsdbAllocBytes(pRepo, memRowTLen(tmp));
if (pMem == NULL) return NULL;
memRowCpy(pMem, tmp);
(*pPoints)++;
......@@ -731,13 +728,14 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
return pMem;
}
static void* tsdbInsertDupKeyMergePacked(void** args) {
return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7]);
static void *tsdbInsertDupKeyMergePacked(void **args) {
return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema **)&args[3], (STSchema **)&args[4], args[5],
args[6], args[7]);
}
static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, SMemRow* pLastRow) {
if(pSkipList->insertHandleFn == NULL) {
static void tsdbSetupSkipListHookFns(SSkipList *pSkipList, STsdb *pRepo, STable *pTable, int32_t *pPoints,
SMemRow *pLastRow) {
if (pSkipList->insertHandleFn == NULL) {
tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9);
dupHandleSavedFunc->args[2] = pRepo;
dupHandleSavedFunc->args[3] = NULL;
......@@ -749,18 +747,17 @@ static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STa
pSkipList->insertHandleFn->args[7] = pLastRow;
}
static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
int32_t points = 0;
STable *pTable = NULL;
SSubmitBlkIter blkIter = {0};
SMemTable *pMemTable = NULL;
STableData *pTableData = NULL;
STsdbCfg *pCfg = &(pRepo->config);
static int tsdbInsertDataToTable(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
int32_t points = 0;
STable * pTable = NULL;
SSubmitBlkIter blkIter = {0};
SMemTable * pMemTable = NULL;
STableData * pTableData = NULL;
STsdbCfg * pCfg = &(pRepo->config);
tsdbInitSubmitBlkIter(pBlock, &blkIter);
if(blkIter.row == NULL) return 0;
tInitSubmitBlkIter(pBlock, &blkIter);
if (blkIter.row == NULL) return 0;
TSKEY firstRowKey = memRowKey(blkIter.row);
tsdbAllocBytes(pRepo, 0);
......@@ -773,7 +770,6 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
return -1;
......@@ -808,7 +804,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
(*pAffectedRows) += points;
if(lastRow != NULL) {
if (lastRow != NULL) {
TSKEY lastRowKey = memRowKey(lastRow);
if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey;
pMemTable->numOfRows += dsize;
......@@ -829,7 +825,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
return 0;
}
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
static int tsdbCheckTableSchema(STsdb *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
......@@ -900,12 +896,11 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT
return 0;
}
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow row) {
static void updateTableLatestColumn(STsdb *pRepo, STable *pTable, SMemRow row) {
tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data,
memRowVersion(row));
STSchema* pSchema = tsdbGetTableLatestSchema(pTable);
STSchema *pSchema = tsdbGetTableLatestSchema(pTable);
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
return;
}
......@@ -916,7 +911,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
}
SDataCol *pLatestCols = pTable->lastCols;
int32_t kvIdx = 0;
int32_t kvIdx = 0;
for (int16_t j = 0; j < schemaNCols(pSchema); j++) {
STColumn *pTCol = schemaColAt(pSchema, j);
......@@ -935,7 +930,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
continue;
}
// lock
TSDB_WLOCK_TABLE(pTable);
TSDB_WLOCK_TABLE(pTable);
SDataCol *pDataCol = &(pLatestCols[idx]);
if (pDataCol->pData == NULL) {
pDataCol->pData = malloc(pTCol->bytes);
......@@ -949,14 +944,15 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
// the actual data size CANNOT larger than column size
assert(pTCol->bytes >= bytes);
memcpy(pDataCol->pData, value, bytes);
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
// tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes,
// (char*)pDataCol->pData);
pDataCol->ts = memRowKey(row);
// unlock
TSDB_WUNLOCK_TABLE(pTable);
TSDB_WUNLOCK_TABLE(pTable);
}
}
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row) {
static int tsdbUpdateTableLatestInfo(STsdb *pRepo, STable *pTable, SMemRow row) {
STsdbCfg *pCfg = &pRepo->config;
// if cacheLastRow config has been reset, free the lastRow
......
......@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "tcompare.h"
#include "tsdbint.h"
#include "tutil.h"
......@@ -1690,3 +1692,4 @@ static void tsdbFreeTableSchema(STable *pTable) {
taosArrayDestroy(pTable->schema);
}
}
#endif
\ No newline at end of file
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "os.h"
#include "tdataformat.h"
#include "tskiplist.h"
......@@ -4574,4 +4576,5 @@ int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) {
return pQueryHandle->srows;
}
return 0;
}
\ No newline at end of file
}
#endif
\ No newline at end of file
......@@ -28,7 +28,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlo
static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock);
static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock);
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
ASSERT(pReadh != NULL && pRepo != NULL);
STsdbCfg *pCfg = REPO_CFG(pRepo);
......@@ -74,7 +74,7 @@ void tsdbDestroyReadH(SReadH *pReadh) {
pReadh->cidx = 0;
pReadh->pBlkIdx = NULL;
pReadh->pTable = NULL;
pReadh->aBlkIdx = taosArrayDestroy(&pReadh->aBlkIdx);
pReadh->aBlkIdx = taosArrayDestroy(pReadh->aBlkIdx);
tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
pReadh->pRepo = NULL;
}
......@@ -837,7 +837,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) {
ASSERT(pDataCol->colId == pBlockCol->colId);
STsdbRepo *pRepo = TSDB_READ_REPO(pReadh);
STsdb *pRepo = TSDB_READ_REPO(pReadh);
STsdbCfg * pCfg = REPO_CFG(pRepo);
int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
......
......@@ -20,7 +20,7 @@
// Sync handle
typedef struct {
STsdbRepo *pRepo;
STsdb *pRepo;
SRtn rtn;
SOCKET socketFd;
void * pBuf;
......@@ -33,7 +33,7 @@ typedef struct {
#define SYNC_BUFFER(sh) ((sh)->pBuf)
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, SOCKET socketFd);
static void tsdbInitSyncH(SSyncH *pSyncH, STsdb *pRepo, SOCKET socketFd);
static void tsdbDestroySyncH(SSyncH *pSyncH);
static int32_t tsdbSyncSendMeta(SSyncH *pSynch);
static int32_t tsdbSyncRecvMeta(SSyncH *pSynch);
......@@ -47,10 +47,10 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2);
static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet);
static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet);
static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch);
static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged);
static int tsdbReload(STsdb *pRepo, bool isMfChanged);
int32_t tsdbSyncSend(void *tsdb, SOCKET socketFd) {
STsdbRepo *pRepo = (STsdbRepo *)tsdb;
STsdb *pRepo = (STsdb *)tsdb;
SSyncH synch = {0};
tsdbInitSyncH(&synch, pRepo, socketFd);
......@@ -79,7 +79,7 @@ _err:
}
int32_t tsdbSyncRecv(void *tsdb, SOCKET socketFd) {
STsdbRepo *pRepo = (STsdbRepo *)tsdb;
STsdb *pRepo = (STsdb *)tsdb;
SSyncH synch = {0};
pRepo->state = TSDB_STATE_OK;
......@@ -114,7 +114,7 @@ _err:
return -1;
}
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, SOCKET socketFd) {
static void tsdbInitSyncH(SSyncH *pSyncH, STsdb *pRepo, SOCKET socketFd) {
pSyncH->pRepo = pRepo;
pSyncH->socketFd = socketFd;
tsdbGetRtnSnap(pRepo, &(pSyncH->rtn));
......@@ -123,7 +123,7 @@ static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, SOCKET socketFd) {
static void tsdbDestroySyncH(SSyncH *pSyncH) { taosTZfree(pSyncH->pBuf); }
static int32_t tsdbSyncSendMeta(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
bool toSendMeta = false;
SMFile mf;
......@@ -174,7 +174,7 @@ static int32_t tsdbSyncSendMeta(SSyncH *pSynch) {
}
static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
SMFile * pLMFile = pRepo->fs->cstatus->pmf;
// Recv meta info from remote
......@@ -247,7 +247,7 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
}
static int32_t tsdbSendMetaInfo(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
uint32_t tlen = 0;
SMFile * pMFile = pRepo->fs->cstatus->pmf;
......@@ -282,7 +282,7 @@ static int32_t tsdbSendMetaInfo(SSyncH *pSynch) {
}
static int32_t tsdbRecvMetaInfo(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
uint32_t tlen = 0;
char buf[64] = {0};
......@@ -328,7 +328,7 @@ static int32_t tsdbRecvMetaInfo(SSyncH *pSynch) {
}
static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
uint8_t decision = toSend;
int32_t writeLen = sizeof(uint8_t);
......@@ -343,7 +343,7 @@ static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend) {
}
static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
uint8_t decision = 0;
int32_t readLen = sizeof(uint8_t);
......@@ -359,7 +359,7 @@ static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend) {
}
static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
STsdbFS * pfs = REPO_FS(pRepo);
SFSIter fsiter;
SDFileSet *pSet;
......@@ -385,7 +385,7 @@ static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch) {
}
static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
STsdbFS * pfs = REPO_FS(pRepo);
SFSIter fsiter;
SDFileSet *pLSet; // Local file set
......@@ -566,7 +566,7 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) {
}
static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
bool toSend = false;
// skip expired fileset
......@@ -628,7 +628,7 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
}
static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
uint32_t tlen = 0;
if (pSet) {
......@@ -660,7 +660,7 @@ static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) {
}
static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdb *pRepo = pSynch->pRepo;
uint32_t tlen;
char buf[64] = {0};
......@@ -703,7 +703,7 @@ static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) {
return 0;
}
static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged) {
static int tsdbReload(STsdb *pRepo, bool isMfChanged) {
// TODO: may need to stop and restart stream
// if (isMfChanged) {
tsdbCloseMeta(pRepo);
......
......@@ -335,7 +335,7 @@ int tfsRename(char *orname, char *nrname) {
snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname);
snprintf(naname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), nrname);
taosRename(oaname, naname);
taosRenameFile(oaname, naname);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册