提交 3a4a91a0 编写于 作者: S slguan

remove unsed files

上级 021d4b42
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(./src SRC)
LIST(REMOVE_ITEM SRC ./src/vnodeFileUtil.c)
LIST(REMOVE_ITEM SRC ./src/taosGrant.c)
ADD_LIBRARY(vnode ${SRC})
IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(vnode vcluster)
ELSEIF (TD_LITE)
TARGET_LINK_LIBRARIES(vnode vlite)
ENDIF ()
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_H
#define TDENGINE_VNODE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "tglobalcfg.h"
#include "tidpool.h"
#include "tlog.h"
#include "tmempool.h"
#include "trpc.h"
#include "tsclient.h"
#include "taosdef.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "vnodeCache.h"
#include "vnodeFile.h"
#include "vnodePeer.h"
#include "vnodeShell.h"
#define TSDB_FILE_HEADER_LEN 512
#define TSDB_FILE_HEADER_VERSION_SIZE 32
#define TSDB_CACHE_POS_BITS 13
#define TSDB_CACHE_POS_MASK 0x1FFF
#define TSDB_ACTION_INSERT 0
#define TSDB_ACTION_IMPORT 1
#define TSDB_ACTION_DELETE 2
#define TSDB_ACTION_UPDATE 3
#define TSDB_ACTION_MAX 4
enum _data_source {
TSDB_DATA_SOURCE_METER,
TSDB_DATA_SOURCE_VNODE,
TSDB_DATA_SOURCE_SHELL,
TSDB_DATA_SOURCE_QUEUE,
TSDB_DATA_SOURCE_LOG,
};
enum _sync_cmd {
TSDB_SYNC_CMD_FILE,
TSDB_SYNC_CMD_CACHE,
TSDB_SYNC_CMD_CREATE,
TSDB_SYNC_CMD_REMOVE,
};
typedef struct {
int64_t offset : 48;
int64_t length : 16;
} SMeterObjHeader;
typedef struct {
int64_t len;
char data[];
} SData;
#pragma pack(push, 8)
typedef struct {
SVnodeStatisticInfo vnodeStatistic;
int vnode;
SVnodeCfg cfg;
// SDiskDesc tierDisk[TSDB_MAX_TIER];
SVPeerDesc vpeers[TSDB_VNODES_SUPPORT];
SVnodePeer * peerInfo[TSDB_VNODES_SUPPORT];
char selfIndex;
char vnodeStatus;
char accessState; // Vnode access state, Readable/Writable
char syncStatus;
char commitInProcess;
pthread_t commitThread;
TSKEY firstKey; // minimum key uncommitted, it may be smaller than
// commitFirstKey
TSKEY commitFirstKey; // minimum key for a commit file, it shall be
// xxxx00000, calculated from fileId
TSKEY commitLastKey; // maximum key for a commit file, it shall be xxxx99999,
// calculated fromm fileId
int commitFileId;
TSKEY lastCreate;
TSKEY lastRemove;
TSKEY lastKey; // last key for the whole vnode, updated by every insert
// operation
uint64_t version;
int streamRole;
int numOfStreams;
void *streamTimer;
TSKEY lastKeyOnFile; // maximum key on the last file, is shall be xxxx99999
int fileId;
int badFileId;
int numOfFiles;
int maxFiles;
int maxFile1;
int maxFile2;
int nfd; // temp head file FD
int hfd; // head file FD
int lfd; // last file FD
int tfd; // temp last file FD
int dfd; // data file FD
int64_t dfSize;
int64_t lfSize;
uint64_t * fmagic; // hold magic number for each file
char cfn[TSDB_FILENAME_LEN];
char nfn[TSDB_FILENAME_LEN];
char lfn[TSDB_FILENAME_LEN]; // last file name
char tfn[TSDB_FILENAME_LEN]; // temp last file name
pthread_mutex_t vmutex;
int logFd;
char * pMem;
char * pWrite;
pthread_mutex_t logMutex;
char logFn[TSDB_FILENAME_LEN];
char logOFn[TSDB_FILENAME_LEN];
int64_t mappingSize;
int64_t mappingThreshold;
void * commitTimer;
void ** meterList;
void * pCachePool;
void * pQueue;
pthread_t thread;
int peersOnline;
int shellConns;
int meterConns;
struct _qinfo *pQInfoList;
TAOS * dbConn;
SMeterObjHeader *meterIndex;
} SVnodeObj;
#pragma pack(pop)
typedef struct SColumn {
short colId;
short bytes;
char type;
} SColumn;
typedef struct _meter_obj {
uint64_t uid;
char meterId[TSDB_TABLE_ID_LEN];
int sid;
short vnode;
short numOfColumns;
short bytesPerPoint;
short maxBytes;
int32_t pointsPerBlock;
int32_t pointsPerFileBlock;
int freePoints;
TSKEY lastKey; // updated by insert operation
TSKEY lastKeyOnFile; // last key on file, updated by commit action
TSKEY timeStamp; // delete or added time
uint64_t commitCount;
int32_t sversion;
short sqlLen;
char searchAlgorithm : 4;
char compAlgorithm : 4;
char status; // 0: ok, 1: stop stream computing
char reserved[16];
int state;
int numOfQueries;
char * pSql;
void * pStream;
void * pCache;
SColumn *schema;
} SMeterObj;
typedef struct {
char type;
char pversion; // protocol version
char action; // insert, import, delete, update
int32_t sversion; // only for insert
int32_t sid;
int32_t len;
uint64_t lastVersion; // latest version
char cont[];
} SVMsgHeader;
struct tSQLBinaryExpr;
typedef struct SColumnInfoEx {
SColumnInfo data;
int16_t colIdx;
int16_t colIdxInBuf;
/*
* 0: denotes if its is required in the first round of scan of data block
* 1: denotes if its is required in the secondary scan
*/
int16_t req[2];
} SColumnInfoEx;
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem *pFilter, char *val1, char *val2);
typedef struct SColumnFilterElem {
int16_t bytes; // column length
__filter_func_t fp;
SColumnFilterInfo filterInfo;
} SColumnFilterElem;
typedef struct SSingleColumnFilterInfo {
SColumnInfoEx info;
int32_t numOfFilters;
SColumnFilterElem *pFilters;
char * pData;
} SSingleColumnFilterInfo;
typedef struct SQuery {
short numOfCols;
SOrderVal order;
char keyIsMet; // if key is met, it will be set
char over;
int fileId; // only for query in file
int hfd; // only for query in file, head file handle
int dfd; // only for query in file, data file handle
int lfd; // only for query in file, last file handle
SCompBlock *pBlock; // only for query in file
SField ** pFields;
int numOfBlocks; // only for query in file
int blockBufferSize; // length of pBlock buffer
int currentSlot;
int firstSlot;
/*
* the two parameters are utilized to handle the data missing situation, caused by import operation.
* When the commit slot is the first slot, and commitPoints != 0
*/
int32_t commitSlot; // which slot is committed,
int32_t commitPoint; // starting point for next commit
int slot;
int pos;
TSKEY key;
int compBlockLen; // only for import
int64_t blockId;
TSKEY skey;
TSKEY ekey;
int64_t intervalTime;
int64_t slidingTime; // sliding time for sliding window query
char intervalTimeUnit; // interval data type, used for daytime revise
int8_t precision;
int16_t numOfOutputCols;
int16_t interpoType;
int16_t checkBufferInLoop; // check if the buffer is full during scan each block
SLimitVal limit;
int32_t rowSize;
SSqlGroupbyExpr * pGroupbyExpr;
SSqlFunctionExpr * pSelectExpr;
SColumnInfoEx * colList;
int32_t numOfFilterCols;
SSingleColumnFilterInfo *pFilterInfo;
int64_t * defaultVal;
TSKEY lastKey;
// buffer info
int64_t pointsRead; // the number of points returned
int64_t pointsToRead; // maximum number of points to read
int64_t pointsOffset; // the number of points offset to save read data
SData **sdata;
SData * tsData; // timestamp column/primary key column
} SQuery;
typedef struct {
char spi;
char encrypt;
char secret[TSDB_KEY_LEN];
char cipheringKey[TSDB_KEY_LEN];
} SConnSec;
typedef struct {
char * buffer;
char * offset;
int trans;
int bufferSize;
pthread_mutex_t qmutex;
} STranQueue;
// internal globals
extern int tsMeterSizeOnFile;
extern void * tsQueryQhandle;
extern int tsVnodePeers;
extern int tsMaxVnode;
extern int tsMaxQueues;
extern int tsOpenVnodes;
extern SVnodeObj *vnodeList;
extern void * vnodeTmrCtrl;
// read API
extern int (*vnodeSearchKeyFunc[])(char *pValue, int num, TSKEY key, int order);
void *vnodeQueryOnSingleTable(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs,
SQueryMeterMsg *pQueryMsg, int *code);
void *vnodeQueryOnMultiMeters(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SQueryMeterMsg *pQueryMsg, int *code);
// assistant/tool functions
SSqlGroupbyExpr *vnodeCreateGroupbyExpr(SQueryMeterMsg *pQuery, int32_t *code);
SSqlFunctionExpr *vnodeCreateSqlFunctionExpr(SQueryMeterMsg *pQuery, int32_t *code);
bool vnodeValidateExprColumnInfo(SQueryMeterMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg);
bool vnodeIsValidVnodeCfg(SVnodeCfg *pCfg);
int32_t vnodeGetResultSize(void *handle, int32_t *numOfRows);
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows);
int64_t vnodeGetOffsetVal(void *thandle);
bool vnodeHasRemainResults(void *handle);
int vnodeRetrieveQueryResult(void *handle, int *pNum, char *argv[]);
int vnodeSaveQueryResult(void *handle, char *data, int32_t* size);
int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t *timePrec);
void vnodeFreeQInfo(void *, bool);
void vnodeFreeQInfoInQueue(void *param);
bool vnodeIsQInfoValid(void *param);
void vnodeDecRefCount(void *param);
void vnodeAddRefCount(void *param);
int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery);
void vnodeQueryData(SSchedMsg *pMsg);
// meter API
int vnodeOpenMetersVnode(int vnode);
void vnodeCloseMetersVnode(int vnode);
int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec);
int vnodeRemoveMeterObj(int vnode, int sid);
int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints, TSKEY now);
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints, TSKEY now);
int vnodeInsertBufferedPoints(int vnode);
int vnodeSaveAllMeterObjToFile(int vnode);
int vnodeSaveMeterObjToFile(SMeterObj *pObj);
int vnodeSaveVnodeCfg(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc);
int vnodeSaveVnodeInfo(int vnode);
// cache API
void *vnodeOpenCachePool(int vnode);
void vnodeCloseCachePool(int vnode);
void *vnodeAllocateCacheInfo(SMeterObj *pObj);
void vnodeFreeCacheInfo(SMeterObj *pObj);
void vnodeSetCommitQuery(SMeterObj *pObj, SQuery *pQuery);
int vnodeInsertPointToCache(SMeterObj *pObj, char *pData);
int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery);
uint64_t vnodeGetPoolCount(SVnodeObj *pVnode);
void vnodeUpdateCommitInfo(SMeterObj *pObj, int slot, int pos, uint64_t count);
void vnodeCommitOver(SVnodeObj *pVnode);
TSKEY vnodeGetFirstKey(int vnode);
int vnodeSyncRetrieveCache(int vnode, int fd);
int vnodeSyncRestoreCache(int vnode, int fd);
pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode);
void vnodeCancelCommit(SVnodeObj *pVnode);
void vnodeCloseStream(SVnodeObj *pVnode);
void vnodeProcessCommitTimer(void *param, void *tmrId);
void vnodeSearchPointInCache(SMeterObj *pObj, SQuery *pQuery);
int vnodeAllocateCacheBlock(SMeterObj *pObj);
int vnodeFreeCacheBlock(SCacheBlock *pCacheBlock);
int vnodeIsCacheCommitted(SMeterObj *pObj);
// file API
int vnodeInitFile(int vnode);
int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery);
void *vnodeCommitToFile(void *param);
void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid);
int vnodeSyncRetrieveFile(int vnode, int fd, uint32_t fileId, uint64_t *fmagic);
int vnodeSyncRestoreFile(int vnode, int sfd);
int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pBlock, SData *data[], SData *cdata[], int pointsRead);
int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery);
int vnodeReadCompBlockToMem(SMeterObj *pObj, SQuery *pQuery, SData *sdata[]);
int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast);
void vnodeCloseCommitFiles(SVnodeObj *pVnode);
int vnodeReadLastBlockToMem(SMeterObj *pObj, SCompBlock *pBlock, SData *sdata[]);
// vnode API
void vnodeUpdateStreamRole(SVnodeObj *pVnode);
int vnodeInitPeer(int numOfThreads);
void vnodeCleanUpPeer();
int vnodeOpenPeerVnode(int vnode);
void vnodeClosePeerVnode(int vnode);
void *vnodeGetMeterPeerConnection(SMeterObj *pObj, int index);
int vnodeForwardToPeer(SMeterObj *pObj, char *msg, int msgLen, char action, int sversion);
void vnodeCloseAllSyncFds(int vnode);
void vnodeConfigVPeers(int vnode, int numOfPeers, SVPeerDesc peerDesc[]);
void vnodeStartSyncProcess(SVnodeObj *pVnode);
void vnodeCancelSync(int vnode);
void vnodeListPeerStatus(char *buffer);
void vnodeCheckOwnStatus(SVnodeObj *pVnode);
int vnodeSaveMeterObjToFile(SMeterObj *pObj);
int vnodeRecoverFromPeer(SVnodeObj *pVnode, int fileId);
// vnodes API
int vnodeInitVnodes();
int vnodeInitStore();
void vnodeCleanUpVnodes();
int vnodeRemoveVnode(int vnode);
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc);
void vnodeOpenStreams(void *param, void *tmrId);
void vnodeCreateStream(SMeterObj *pObj);
void vnodeRemoveStream(SMeterObj *pObj);
// shell API
int vnodeInitShell();
void vnodeCleanUpShell();
int vnodeOpenShellVnode(int vnode);
void vnodeCloseShellVnode(int vnode);
// memter mgmt
int vnodeInitMeterMgmt();
void vnodeCleanUpMeterMgmt();
int vnodeOpenMeterMgmtVnode(int vnode);
int vnodeOpenMeterMgmtStoreVnode(int vnode);
void vnodeCloseMeterMgmtVnode(int vnode);
int vnodeCreateMeterMgmt(SMeterObj *pObj, SConnSec *pSec);
void vnodeRemoveMeterMgmt(SMeterObj *pObj);
SConnSec *vnodeGetMeterSec(int vnode, int sid);
int vnodeCreateMeterObjFile(int vnode);
// mgmt
void vnodeCleanUpMgmt();
int vnodeRetrieveMissedCreateMsg(int vnode, int fd, uint64_t stime);
int vnodeRestoreMissedCreateMsg(int vnode, int fd);
int vnodeRetrieveMissedRemoveMsg(int vid, int fd, uint64_t stime);
int vnodeRestoreMissedRemoveMsg(int vnode, int fd);
int vnodeProcessBufferedCreateMsgs(int vnode);
void vnodeSendVpeerCfgMsg(int vnode);
int vnodeSendMeterCfgMsg(int vnode, int sid);
int vnodeMgmtConns();
void vnodeRemoveFile(int vnode, int fileId);
// commit
int vnodeInitCommit(int vnode);
void vnodeCleanUpCommit(int vnode);
int vnodeRenewCommitLog(int vnode);
void vnodeRemoveCommitLog(int vnode);
int vnodeWriteToCommitLog(SMeterObj *pObj, char action, char *cont, int contLen, int sversion);
extern int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *, TSKEY);
extern int (*pCompFunc[])(const char *const input, int inputSize, const int elements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize);
extern int (*pDecompFunc[])(const char *const input, int compressedSize, const int elements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize);
// global variable and APIs provided by mgmt
extern char mgmtStatus;
extern char tsMgmtDirectory[];
extern const int16_t vnodeFileVersion;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODECACHE_H
#define TDENGINE_VNODECACHE_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
short notFree;
short numOfPoints;
int slot;
int index;
int64_t blockId;
struct _meter_obj *pMeterObj;
char * offset[];
} SCacheBlock;
typedef struct {
int64_t blocks;
int maxBlocks;
int numOfBlocks;
int unCommittedBlocks;
int32_t currentSlot;
int32_t commitSlot; // which slot is committed
int32_t commitPoint; // starting point for next commit
SCacheBlock **cacheBlocks; // cache block list, circular list
} SCacheInfo;
typedef struct {
int vnode;
char ** pMem;
int64_t freeSlot;
pthread_mutex_t vmutex;
uint64_t count; // kind of transcation ID
int64_t notFreeSlots;
int64_t threshold;
char commitInProcess;
int cacheBlockSize;
int cacheNumOfBlocks;
} SCachePool;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODECACHE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEDATAFILTERFUNC_H
#define TDENGINE_VNODEDATAFILTERFUNC_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnode.h"
__filter_func_t *vnodeGetRangeFilterFuncArray(int32_t type);
__filter_func_t *vnodeGetValueFilterFuncArray(int32_t type);
bool vnodeSupportPrefilter(int32_t type);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEDATAFILTERFUNC_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEFILE_H
#define TDENGINE_VNODEFILE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tchecksum.h"
#define TSDB_VNODE_DELIMITER 0xF00AFA0F
typedef struct { int64_t compInfoOffset; } SCompHeader;
typedef struct {
short colId;
short bytes;
int32_t numOfNullPoints;
int32_t type : 8;
int32_t offset : 24;
int32_t len; // data length
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
char reserved[20];
} SField;
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8; // compression algorithm can be changed
int32_t numOfPoints : 24; // how many points have been written into this block
int32_t sversion;
int32_t len; // total length of this data block
uint16_t numOfCols;
char reserved[16];
TSKEY keyFirst; // time stamp for the first point
TSKEY keyLast; // time stamp for the last point
} SCompBlock;
typedef struct {
SCompBlock *compBlock;
SField * fields;
} SCompBlockFields;
typedef struct {
uint64_t uid;
int64_t last : 1;
int64_t numOfBlocks : 62;
uint32_t delimiter; // delimiter for recovery
TSCKSUM checksum;
SCompBlock compBlocks[]; // comp block list
} SCompInfo;
typedef struct {
int64_t tempHeadOffset;
int64_t compInfoOffset;
int64_t oldCompBlockOffset;
int64_t oldNumOfBlocks;
int64_t newNumOfBlocks;
int64_t finalNumOfBlocks;
int64_t oldCompBlockLen;
int64_t newCompBlockLen;
int64_t finalCompBlockLen;
int64_t committedPoints;
int commitSlot;
int32_t last : 1;
int32_t changed : 1;
int32_t commitPos : 30;
int64_t commitCount;
SCompBlock lastBlock;
} SMeterInfo;
typedef struct { int64_t totalStorage; } SVnodeHeadInfo;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEFILE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEQUERYIMPL_H
#define TDENGINE_VNODEQUERYIMPL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "hash.h"
#include "hashfunc.h"
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query))
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
/*
* set the output buffer page size is 16k
* The page size should be sufficient for at least one output result or intermediate result.
* Some intermediate results may be extremely large, such as top/bottom(100) query.
*/
#define DEFAULT_INTERN_BUF_SIZE 16384L
#define INIT_ALLOCATE_DISK_PAGES 60L
#define DEFAULT_DATA_FILE_MAPPING_PAGES 2L
#define DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE (DEFAULT_DATA_FILE_MAPPING_PAGES * DEFAULT_INTERN_BUF_SIZE)
#define IO_ENGINE_MMAP 0
#define IO_ENGINE_SYNC 1
#define DEFAULT_IO_ENGINE IO_ENGINE_SYNC
/**
* check if the primary column is load by default, otherwise, the program will
* forced to load primary column explicitly.
*/
#define PRIMARY_TSCOL_LOADED(query) ((query)->colList[0].data.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
typedef enum {
/*
* the program will call this function again, if this status is set.
* used to transfer from QUERY_RESBUF_FULL
*/
QUERY_NOT_COMPLETED = 0x1u,
/*
* output buffer is full, so, the next query will be employed,
* in this case, we need to set the appropriated start scan point for
* the next query.
*
* this status is only exist in group-by clause and
* diff/add/division/multiply/ query.
*/
QUERY_RESBUF_FULL = 0x2u,
/*
* query is over
* 1. this status is used in one row result query process, e.g.,
* count/sum/first/last/
* avg...etc.
* 2. when the query range on timestamp is satisfied, it is also denoted as
* query_compeleted
*/
QUERY_COMPLETED = 0x4u,
/*
* all data has been scanned, so current search is stopped,
* At last, the function will transfer this status to QUERY_COMPLETED
*/
QUERY_NO_DATA_TO_CHECK = 0x8u,
} vnodeQueryStatus;
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
char** pPrevPoint;
char** pNextPoint;
} SPointInterpoSupporter;
typedef struct SBlockInfo {
TSKEY keyFirst;
TSKEY keyLast;
int32_t numOfCols;
int32_t size;
} SBlockInfo;
typedef struct SMeterDataBlockInfoEx {
SCompBlockFields pBlock;
SMeterDataInfo* pMeterDataInfo;
int32_t blockIndex;
int32_t groupIdx; /* number of group is less than the total number of meters */
} SMeterDataBlockInfoEx;
typedef enum {
DISK_DATA_LOAD_FAILED = -0x1,
DISK_DATA_LOADED = 0x0,
DISK_DATA_DISCARDED = 0x01,
} vnodeDiskLoadStatus;
#define IS_MASTER_SCAN(runtime) (((runtime)->scanFlag & 1u) == MASTER_SCAN)
#define IS_SUPPLEMENT_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
return *(SMeterObj**)taosHashGet(hashHandle, (const char*)&sid, sizeof(sid));
}
bool isQueryKilled(SQuery* pQuery);
bool isFixedOutputQuery(SQuery* pQuery);
bool isPointInterpoQuery(SQuery* pQuery);
bool isSumAvgRateQuery(SQuery *pQuery);
bool isTopBottomQuery(SQuery* pQuery);
bool isFirstLastRowQuery(SQuery* pQuery);
bool isTSCompQuery(SQuery* pQuery);
bool notHasQueryTimeRange(SQuery* pQuery);
bool needSupplementaryScan(SQuery* pQuery);
bool onDemandLoadDatablock(SQuery* pQuery, int16_t queryRangeSet);
void setQueryStatus(SQuery* pQuery, int8_t status);
bool doRevisedResultsByLimit(SQInfo* pQInfo);
void truncateResultByLimit(SQInfo* pQInfo, int64_t* final, int32_t* interpo);
void initCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv);
void resetCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv);
void forwardCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, int64_t output);
bool needPrimaryTimestampCol(SQuery* pQuery, SBlockInfo* pBlockInfo);
void vnodeScanAllData(SQueryRuntimeEnv* pRuntimeEnv);
int32_t vnodeQueryResultInterpolate(SQInfo* pQInfo, tFilePage** pDst, tFilePage** pDataSrc, int32_t numOfRows,
int32_t* numOfInterpo);
void copyResToQueryResultBuf(STableQuerySupportObj* pSupporter, SQuery* pQuery);
void doSkipResults(SQueryRuntimeEnv* pRuntimeEnv);
void doFinalizeResult(SQueryRuntimeEnv* pRuntimeEnv);
int64_t getNumOfResult(SQueryRuntimeEnv* pRuntimeEnv);
void forwardQueryStartPosition(SQueryRuntimeEnv* pRuntimeEnv);
bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySupportObj* pSupporter,
SPointInterpoSupporter* pPointInterpSupporter, int64_t* key);
void pointInterpSupporterInit(SQuery* pQuery, SPointInterpoSupporter* pInterpoSupport);
void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport);
void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport);
int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position);
void disableFunctForSuppleScan(STableQuerySupportObj* pSupporter, int32_t order);
void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter);
void copyFromWindowResToSData(SQInfo* pQInfo, SWindowResult* result);
SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pBlock, int32_t type);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
void stableApplyFunctionsOnBlock(STableQuerySupportObj* pSupporter, SMeterDataInfo* pMeterDataInfo,
SBlockInfo* pBlockInfo, SField* pFields, __block_search_fn_t searchFn);
int32_t vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo,
int32_t* numOfMeters, SMeterDataInfo*** pReqMeterDataInfo);
int32_t vnodeGetVnodeHeaderFileIndex(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t createDataBlocksInfoEx(SMeterDataInfo** pMeterDataInfo, int32_t numOfMeters,
SMeterDataBlockInfoEx** pDataBlockInfoEx, int32_t numOfCompBlocks,
int32_t* nAllocBlocksInfoSize, int64_t addr);
void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len);
void setExecutionContext(STableQuerySupportObj* pSupporter, SMeterQueryInfo* pMeterQueryInfo, int32_t meterIdx,
int32_t groupIdx, TSKEY nextKey);
int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo);
void doGetAlignedIntervalQueryRangeImpl(SQuery* pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t* actualSkey, int64_t* actualEkey, int64_t* skey, int64_t* ekey);
int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange);
int32_t getDataBlocksForMeters(STableQuerySupportObj* pSupporter, SQuery* pQuery, int32_t numOfMeters,
const char* filePath, SMeterDataInfo** pMeterDataInfo, uint32_t* numOfBlocks);
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
int32_t vnodeGetHeaderFile(SQueryRuntimeEnv* pRuntimeEnv, int32_t fileIndex);
/**
* Create SMeterQueryInfo.
* The MeterQueryInfo is created one for each table during super table query
*
* @param skey
* @param ekey
* @return
*/
SMeterQueryInfo* createMeterQueryInfo(STableQuerySupportObj* pSupporter, int32_t sid, TSKEY skey, TSKEY ekey);
/**
* Destroy meter query info
* @param pMeterQInfo
* @param numOfCols
*/
void destroyMeterQueryInfo(SMeterQueryInfo* pMeterQueryInfo, int32_t numOfCols);
/**
* change the meter query info for supplement scan
* @param pMeterQueryInfo
* @param skey
* @param ekey
*/
void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo,
TSKEY skey, TSKEY ekey);
/**
* add the new allocated disk page to meter query info
* the new allocated disk page is used to keep the intermediate (interval) results
* @param pQuery
* @param pMeterQueryInfo
* @param pSupporter
*/
tFilePage* addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo,
STableQuerySupportObj* pSupporter);
/**
* restore the query range data from SMeterQueryInfo to runtime environment
*
* @param pRuntimeEnv
* @param pMeterQueryInfo
*/
void restoreIntervalQueryRange(SQueryRuntimeEnv* pRuntimeEnv, SMeterQueryInfo* pMeterQueryInfo);
/**
* set the interval query range for the interval query, when handling a data(cache) block
*
* @param pMeterQueryInfo
* @param pSupporter
* @param key
*/
void setIntervalQueryRange(SMeterQueryInfo* pMeterQueryInfo, STableQuerySupportObj* pSupporter, int64_t key);
/**
* set the meter data information
* @param pMeterDataInfo
* @param pMeterObj current query meter object
* @param meterIdx meter index in the sid list
* @param groupId group index, which the meter is belonged to
*/
void setMeterDataInfo(SMeterDataInfo* pMeterDataInfo, SMeterObj* pMeterObj, int32_t meterIdx, int32_t groupId);
void vnodeSetTagValueInParam(tSidSet* pSidSet, SQueryRuntimeEnv* pRuntimeEnv, SMeterSidExtInfo* pMeterInfo);
void vnodeCheckIfDataExists(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* pMeterObj, bool* dataInDisk, bool* dataInCache);
void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows);
void vnodePrintQueryStatistics(STableQuerySupportObj* pSupporter);
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes);
void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type);
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEQUERYIMPL_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEREAD_H
#define TDENGINE_VNODEREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "qresultBuf.h"
#include "qinterpolation.h"
#include "vnodeTagMgmt.h"
/*
* use to keep the first point position, consisting of position in blk and block
* id, file id
*/
typedef struct {
int32_t pos;
int32_t slot;
int32_t fileId;
} SPositionInfo;
typedef struct SLoadDataBlockInfo {
int32_t fileListIndex; /* index of this file in files list of this vnode */
int32_t fileId;
int32_t slotIdx;
int32_t sid;
bool tsLoaded; // if timestamp column of current block is loaded or not
} SLoadDataBlockInfo;
typedef struct SLoadCompBlockInfo {
int32_t sid; /* meter sid */
int32_t fileId;
int32_t fileListIndex;
} SLoadCompBlockInfo;
/*
* the header file info for one vnode
*/
typedef struct SHeaderFileInfo {
int32_t fileID; // file id
} SHeaderFileInfo;
typedef struct SQueryCostSummary {
double cacheTimeUs;
double fileTimeUs;
int64_t numOfFiles; // opened files during query
int64_t numOfTables; // num of queries tables
int64_t numOfSeek; // number of seek operation
int64_t readDiskBlocks; // accessed disk block
int64_t skippedFileBlocks; // skipped blocks
int64_t blocksInCache; // accessed cache blocks
int64_t readField; // field size
int64_t totalFieldSize; // total read fields size
double loadFieldUs; // total elapsed time to read fields info
int64_t totalBlockSize; // read data blocks
double loadBlocksUs; // total elapsed time to read data blocks
int64_t totalGenData; // in-memory generated data
int64_t readCompInfo; // read compblock info
int64_t totalCompInfoSize; // total comp block size
double loadCompInfoUs; // total elapsed time to read comp block info
int64_t tmpBufferInDisk; // size of buffer for intermediate result
} SQueryCostSummary;
typedef struct SPosInfo {
int16_t pageId;
int16_t rowId;
} SPosInfo;
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct SWindowStatus {
bool closed;
} SWindowStatus;
typedef struct SWindowResult {
uint16_t numOfRows;
SPosInfo pos; // Position of current result in disk-based output buffer
SResultInfo* resultInfo; // For each result column, there is a resultInfo
STimeWindow window; // The time window that current result covers.
SWindowStatus status;
} SWindowResult;
/*
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
typedef struct SQueryFilesInfo {
SHeaderFileInfo* pFileInfo;
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
int32_t vnodeId;
int32_t headerFd; // header file fd
int64_t headerFileSize;
int32_t dataFd;
int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct SWindowResInfo {
SWindowResult* pResult; // reference to SQuerySupporter->pResult
void* hashList; // hash list for quick access
int16_t type; // data type for hash key
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int32_t size;
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
int64_t threshold; // threshold for return completed results.
} SWindowResInfo;
typedef struct SQueryRuntimeEnv {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
SPositionInfo nextPos; /* start position of the next scan */
SData* colDataBuffer[TSDB_MAX_COLUMNS];
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
uint8_t blockStatus; // Indicate if data block is loaded, the block is first/last/internal block
int32_t unzipBufSize;
SData* primaryColBuffer;
char* unzipBuffer;
char* secondaryUnzipBuffer;
SQuery* pQuery;
SMeterObj* pMeterObj;
SQLFunctionCtx* pCtx;
SLoadDataBlockInfo loadBlockInfo; /* record current block load information */
SLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SWindowResInfo windowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
bool stableQuery; // is super table query or not
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
* Here we do not use the cache block info from pMeterObj, simple because it may change anytime
* during the query by the submit/insert handling threads.
* So we keep a copy of the support structure as well as the cache block data itself.
*/
SCacheBlock cacheBlock;
} SQueryRuntimeEnv;
/* intermediate pos during multimeter query involves interval */
typedef struct SMeterQueryInfo {
int64_t lastKey;
int64_t skey;
int64_t ekey;
int32_t numOfRes;
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag;
STSCursor cur;
int32_t sid; // for retrieve the page id list
SWindowResInfo windowResInfo;
} SMeterQueryInfo;
typedef struct SMeterDataInfo {
uint64_t offsetInHeaderFile;
int32_t numOfBlocks;
int32_t start; // start block index
SCompBlock* pBlock;
int32_t meterOrderIdx;
SMeterObj* pMeterObj;
int32_t groupIdx; // group id in meter list
SMeterQueryInfo* pMeterQInfo;
} SMeterDataInfo;
typedef struct STableQuerySupportObj {
void* pMetersHashTable; // meter table hash list
SMeterSidExtInfo** pMeterSidExtInfo;
int32_t numOfMeters;
/*
* multimeter query resultset.
* In multimeter queries, the result is temporarily stored on this structure, instead of
* directly put result into output buffer, since we have no idea how many number of
* rows may be generated by a specific subgroup. When query on all subgroups is executed,
* the result is copy to output buffer. This attribution is not used during single meter query processing.
*/
SQueryRuntimeEnv runtimeEnv;
int64_t rawSKey;
int64_t rawEKey;
int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */
tSidSet* pSidSet;
/*
* the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed.
* We later may refactor to remove this attribution by using another flag to denote
* whether a multimeter query is completed or not.
*/
int32_t meterIdx;
int32_t numOfGroupResultPages;
int32_t groupResultSize;
SMeterDataInfo* pMeterDataInfo;
TSKEY* tsList;
} STableQuerySupportObj;
typedef struct _qinfo {
uint64_t signature;
int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely
char user[TSDB_TABLE_ID_LEN + 1];
char sql[TSDB_SHOW_SQL_LEN];
uint8_t stream;
uint16_t port;
uint32_t ip;
uint64_t startTime;
int64_t useconds;
int killed;
struct _qinfo *prev, *next;
SQuery query;
int totalPoints;
int pointsRead;
int pointsReturned;
int pointsInterpo;
int code;
char bufIndex;
char changed;
char over;
SMeterObj* pObj;
sem_t dataReady;
STableQuerySupportObj* pTableQuerySupporter;
int (*fp)(SMeterObj*, SQuery*);
} SQInfo;
int32_t vnodeQueryTablePrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, STableQuerySupportObj* pSMultiMeterObj,
void* param);
void vnodeQueryFreeQInfoEx(SQInfo* pQInfo);
bool vnodeParametersSafetyCheck(SQuery* pQuery);
int32_t vnodeSTableQueryPrepare(SQInfo* pQInfo, SQuery* pQuery, void* param);
/**
* decrease the numofQuery of each table that is queried, enable the
* remove/close operation can be executed
* @param pQInfo
*/
void vnodeDecMeterRefcnt(SQInfo* pQInfo);
/* sql query handle in dnode */
void vnodeSingleTableQuery(SSchedMsg* pMsg);
/*
* handle multi-meter query process
*/
void vnodeMultiMeterQuery(SSchedMsg* pMsg);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEREAD_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODESHELL_H
#define TDENGINE_VNODESHELL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODESHELL_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODESTORE_H
#define TDENGINE_VNODESTORE_H
#ifdef __cplusplus
extern "C" {
#endif
void vnodeProcessDataFromVnode(SIntMsg *msg, void *tcpHandle);
void vnodeCalcOpenVnodes();
bool vnodeRemoveDataFileFromLinkFile(char* linkFile, char* de_name);
int vnodeInitInfo();
#ifdef __cplusplus
}
#endif
#endif // TDEGINE_VNODESTORE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_SUPER_TABLE_QUERY_H
#define TBASE_MNODE_SUPER_TABLE_QUERY_H
#include "os.h"
#include "mnode.h"
#include "qast.h"
int32_t mgmtDoJoin(SSuperTableMetaMsg* pSuperTableMetaMsg, tQueryResultset* pRes);
void mgmtReorganizeMetersInMetricMeta(SSuperTableMetaMsg* pInfo, int32_t index, tQueryResultset* pRes);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODESYSTEM_H
#define TDENGINE_VNODESYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODESYSTEM_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODETAGMGMT_H
#define TDENGINE_VNODETAGMGMT_H
#ifdef __cplusplus
extern "C" {
#endif
/*
* @version 0.1
* @date 2018/01/02
* @author liaohj
* management of the tag value of tables
* in query, client need the vnode to aggregate results according to tags
* values,
* the grouping operation is done here.
* Note:
* 1. we implement a quick sort algorithm, may remove it later.
*/
typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, void *param);
tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOfMeters, SSchema *pSchema,
int32_t numOfTags, SColIndexEx *colList, int32_t numOfOrderCols);
int32_t *calculateSubGroup(void **pSids, int32_t numOfMeters, int32_t *numOfSubset, tOrderDescriptor *pOrderDesc,
__ext_compar_fn_t compareFn);
void tSidSetDestroy(tSidSet **pSets);
void tSidSetSort(tSidSet *pSets);
int32_t meterSidComparator(const void *s1, const void *s2, void *param);
int32_t doCompare(char *f1, char *f2, int32_t type, int32_t size);
void tQSortEx(void **pMeterSids, size_t size, int32_t start, int32_t end, void *param, __ext_compar_fn_t compareFn);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODETAGMGMT_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_UTIL_H
#define TDENGINE_VNODE_UTIL_H
#ifdef __cplusplus
extern "C" {
#endif
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.bytes)
#define GET_COLUMN_TYPE(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.type)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define EXTRA_BYTES 2 // for possible compression deflation
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index)*(step))
int vnodeGetEid(int days);
int vnodeCheckFileIntegrity(FILE *fp);
void vnodeCreateFileHeader(FILE *fp);
void vnodeCreateFileHeaderFd(int fd);
void vnodeGetHeadFileHeaderInfo(int fd, SVnodeHeadInfo *pHeadInfo);
void vnodeUpdateHeadFileHeader(int fd, SVnodeHeadInfo *pHeadInfo);
/**
* check if two schema is identical or not
* This function does not check if a schema is valid or not
*
* @param pSSchemaFirst
* @param numOfCols1
* @param pSSchemaSecond
* @param numOfCols2
* @return
*/
bool vnodeMeterSchemaIdentical(SColumn *pSchema1, int32_t numOfCols1, SColumn *pSchema2, int32_t numOfCols2);
/**
* free SFields in SQuery
* vnodeFreeFields must be called before free(pQuery->pBlock);
* @param pQuery
*/
void vnodeFreeFields(SQuery *pQuery);
void vnodeUpdateFilterColumnIndex(SQuery* pQuery);
void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj);
int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery *pQuery);
bool vnodeFilterData(SQuery* pQuery, int32_t* numOfActualRead, int32_t index);
bool vnodeDoFilterData(SQuery* pQuery, int32_t elemPos);
bool vnodeIsProjectionQuery(SSqlFunctionExpr *pExpr, int32_t numOfOutput);
int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSids, SMeterObj **pMeterObjList,
int32_t *numOfInc);
void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc);
int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state);
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeSetMeterDeleting(SMeterObj* pMeterObj);
int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st);
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid);
void vnodeFreeColumnInfo(SColumnInfo* pColumnInfo);
bool isGroupbyNormalCol(SSqlGroupbyExpr* pExpr);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODE_UTIL_H
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include "os.h"
#include "taosdef.h"
#include "vnode.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
typedef struct {
int sversion;
int sid;
int contLen;
int action:8;
int simpleCheck:24;
} SCommitHead;
int vnodeOpenCommitLog(int vnode, uint64_t firstV) {
SVnodeObj *pVnode = vnodeList + vnode;
char * fileName = pVnode->logFn;
pVnode->logFd = open(fileName, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pVnode->logFd < 0) {
dError("vid:%d, failed to open file:%s, reason:%s", vnode, fileName, strerror(errno));
return -1;
}
dTrace("vid:%d, logfd:%d, open file:%s success", vnode, pVnode->logFd, fileName);
if (posix_fallocate64(pVnode->logFd, 0, pVnode->mappingSize) != 0) {
dError("vid:%d, logfd:%d, failed to alloc file size:%d, reason:%s", vnode, pVnode->logFd, pVnode->mappingSize, strerror(errno));
perror("fallocate failed");
goto _err_log_open;
}
struct stat statbuf;
stat(fileName, &statbuf);
int64_t length = statbuf.st_size;
if (length != pVnode->mappingSize) {
dError("vid:%d, logfd:%d, alloc file size:%" PRId64 " not equal to mapping size:%" PRId64, vnode, pVnode->logFd, length,
pVnode->mappingSize);
goto _err_log_open;
}
pVnode->pMem = mmap(0, pVnode->mappingSize, PROT_WRITE | PROT_READ, MAP_SHARED, pVnode->logFd, 0);
if (pVnode->pMem == MAP_FAILED) {
dError("vid:%d, logfd:%d, failed to map file, reason:%s", vnode, pVnode->logFd, strerror(errno));
goto _err_log_open;
}
pVnode->pWrite = pVnode->pMem;
memcpy(pVnode->pWrite, &(firstV), sizeof(firstV));
pVnode->pWrite += sizeof(firstV);
return pVnode->logFd;
_err_log_open:
close(pVnode->logFd);
remove(fileName);
pVnode->logFd = -1;
return -1;
}
int vnodeRenewCommitLog(int vnode) {
SVnodeObj *pVnode = vnodeList + vnode;
char * fileName = pVnode->logFn;
char * oldName = pVnode->logOFn;
pthread_mutex_lock(&(pVnode->logMutex));
if (FD_VALID(pVnode->logFd)) {
munmap(pVnode->pMem, pVnode->mappingSize);
close(pVnode->logFd);
rename(fileName, oldName);
}
if (pVnode->cfg.commitLog) vnodeOpenCommitLog(vnode, vnodeList[vnode].version);
pthread_mutex_unlock(&(pVnode->logMutex));
return pVnode->logFd;
}
void vnodeRemoveCommitLog(int vnode) { remove(vnodeList[vnode].logOFn); }
size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
int fd, ret;
char * cont = NULL;
size_t totalLen = 0;
int actions = 0;
SVnodeObj *pVnode = vnodeList + vnode;
if (pVnode->meterList == NULL) {
dError("vid:%d, vnode is not initialized!!!", vnode);
return 0;
}
struct stat fstat;
if (stat(fileName, &fstat) < 0) {
dTrace("vid:%d, no log file:%s", vnode, fileName);
return 0;
}
dTrace("vid:%d, uncommitted data in file:%s, restore them ...", vnode, fileName);
fd = open(fileName, O_RDWR);
if (fd < 0) {
dError("vid:%d, failed to open:%s, reason:%s", vnode, fileName, strerror(errno));
goto _error;
}
ret = read(fd, firstV, sizeof(pVnode->version));
if (ret <= 0) {
dError("vid:%d, failed to read version", vnode);
goto _error;
}
pVnode->version = *firstV;
int32_t bufLen = TSDB_PAYLOAD_SIZE;
cont = calloc(1, bufLen);
if (cont == NULL) {
dError("vid:%d, out of memory", vnode);
goto _error;
}
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
SCommitHead head;
int simpleCheck = 0;
while (1) {
ret = read(fd, &head, sizeof(head));
if (ret < 0) goto _error;
if (ret == 0) break;
if (((head.sversion+head.sid+head.contLen+head.action) & 0xFFFFFF) != head.simpleCheck) break;
simpleCheck = head.simpleCheck;
// head.contLen validation is removed
if (head.sid >= pVnode->cfg.maxSessions || head.sid < 0 || head.action >= TSDB_ACTION_MAX) {
dError("vid, invalid commit head, sid:%d contLen:%d action:%d", head.sid, head.contLen, head.action);
} else {
if (head.contLen > 0) {
if (bufLen < head.contLen+sizeof(simpleCheck)) { // pre-allocated buffer is not enough
cont = realloc(cont, head.contLen+sizeof(simpleCheck));
bufLen = head.contLen+sizeof(simpleCheck);
}
if (read(fd, cont, head.contLen+sizeof(simpleCheck)) < 0) goto _error;
if (*(int *)(cont+head.contLen) != simpleCheck) break;
SMeterObj *pObj = pVnode->meterList[head.sid];
if (pObj == NULL) {
dError("vid:%d, sid:%d not exists, ignore data in commit log, contLen:%d action:%d",
vnode, head.sid, head.contLen, head.action);
continue;
}
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, ignore data in commit log, contLen:%d action:%d",
vnode, head.sid, head.contLen, head.action);
continue;
}
int32_t numOfPoints = 0;
(*vnodeProcessAction[head.action])(pObj, cont, head.contLen, TSDB_DATA_SOURCE_LOG, NULL, head.sversion,
&numOfPoints, now);
actions++;
} else {
break;
}
}
totalLen += sizeof(head) + head.contLen + sizeof(simpleCheck);
}
tclose(fd);
tfree(cont);
dTrace("vid:%d, %d pieces of uncommitted data are restored", vnode, actions);
return totalLen;
_error:
tclose(fd);
tfree(cont);
dError("vid:%d, failed to restore %s, remove this node...", vnode, fileName);
// rename to error file for future process
char *f = NULL;
taosFileRename(fileName, "error", '/', &f);
free(f);
return -1;
}
int vnodeInitCommit(int vnode) {
size_t size = 0;
uint64_t firstV = 0;
SVnodeObj *pVnode = vnodeList + vnode;
pthread_mutex_init(&(pVnode->logMutex), NULL);
sprintf(pVnode->logFn, "%s/vnode%d/db/submit%d.log", tsDirectory, vnode, vnode);
sprintf(pVnode->logOFn, "%s/vnode%d/db/submit%d.olog", tsDirectory, vnode, vnode);
pVnode->mappingSize = ((int64_t)pVnode->cfg.cacheBlockSize) * pVnode->cfg.cacheNumOfBlocks.totalBlocks * 1.5;
pVnode->mappingThreshold = pVnode->mappingSize * 0.7;
// restore from .olog file and commit to file
size = vnodeRestoreDataFromLog(vnode, pVnode->logOFn, &firstV);
if (size < 0) return -1;
if (size > 0) {
if (pVnode->commitInProcess == 0) vnodeCommitToFile(pVnode);
remove(pVnode->logOFn);
}
// restore from .log file to cache
size = vnodeRestoreDataFromLog(vnode, pVnode->logFn, &firstV);
if (size < 0) return -1;
if (pVnode->cfg.commitLog == 0) return 0;
if (size == 0) firstV = pVnode->version;
if (vnodeOpenCommitLog(vnode, firstV) < 0) {
dError("vid:%d, commit log init failed", vnode);
return -1;
}
pVnode->pWrite += size;
dPrint("vid:%d, commit log is initialized", vnode);
return 0;
}
void vnodeCleanUpCommit(int vnode) {
SVnodeObj *pVnode = vnodeList + vnode;
if (FD_VALID(pVnode->logFd)) close(pVnode->logFd);
if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) {
dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn);
taosLogError("vid:%d, failed to remove:%s", vnode, pVnode->logFn);
}
pthread_mutex_destroy(&(pVnode->logMutex));
}
int vnodeWriteToCommitLog(SMeterObj *pObj, char action, char *cont, int contLen, int sverion) {
SVnodeObj *pVnode = vnodeList + pObj->vnode;
if (pVnode->pWrite == NULL) return 0;
SCommitHead head;
head.sid = pObj->sid;
head.action = action;
head.sversion = pObj->sversion;
head.contLen = contLen;
head.simpleCheck = (head.sversion+head.sid+head.contLen+head.action) & 0xFFFFFF;
int simpleCheck = head.simpleCheck;
pthread_mutex_lock(&(pVnode->logMutex));
// 100 bytes redundant mem space
if (pVnode->mappingSize - (pVnode->pWrite - pVnode->pMem) < contLen + sizeof(SCommitHead) + sizeof(simpleCheck) + 100) {
pthread_mutex_unlock(&(pVnode->logMutex));
dTrace("vid:%d, mem mapping space is not enough, wait for commit", pObj->vnode);
vnodeProcessCommitTimer(pVnode, NULL);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
char *pWrite = pVnode->pWrite;
pVnode->pWrite += sizeof(head) + contLen + sizeof(simpleCheck);
memcpy(pWrite, (char *)&head, sizeof(head));
memcpy(pWrite + sizeof(head), cont, contLen);
memcpy(pWrite + sizeof(head) + contLen, &simpleCheck, sizeof(simpleCheck));
pthread_mutex_unlock(&(pVnode->logMutex));
if (pVnode->pWrite - pVnode->pMem > pVnode->mappingThreshold) {
dTrace("vid:%d, mem mapping is close to limit, commit", pObj->vnode);
vnodeProcessCommitTimer(pVnode, NULL);
}
dTrace("vid:%d sid:%d, data is written to commit log", pObj->vnode, pObj->sid);
return 0;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册