vnode.h 10.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15 16 17 18
 */

#ifndef _TD_VNODE_H_
#define _TD_VNODE_H_

H
refact  
Hongze Cheng 已提交
19
#include "os.h"
S
Shengliang Guan 已提交
20
#include "tmsgcb.h"
L
Liu Jicong 已提交
21 22
#include "tqueue.h"
#include "trpc.h"
H
refact  
Hongze Cheng 已提交
23

H
refact  
Hongze Cheng 已提交
24
#include "tarray.h"
S
Shengliang Guan 已提交
25
#include "tfs.h"
H
refact  
Hongze Cheng 已提交
26
#include "wal.h"
S
Shengliang Guan 已提交
27

H
Hongze Cheng 已提交
28 29
#include "tcommon.h"
#include "tfs.h"
H
Hongze Cheng 已提交
30 31 32 33
#include "tmallocator.h"
#include "tmsg.h"
#include "trow.h"

S
Shengliang Guan 已提交
34 35 36 37
#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
38
// vnode
H
Hongze Cheng 已提交
39 40 41 42 43 44
typedef struct SVnode    SVnode;
typedef struct SMetaCfg  SMetaCfg;  // todo: remove
typedef struct STsdbCfg  STsdbCfg;  // todo: remove
typedef struct STqCfg    STqCfg;    // todo: remove
typedef struct SVnodeCfg SVnodeCfg;

H
Hongze Cheng 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
int     vnodeInit();
void    vnodeCleanup();
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
void    vnodeClose(SVnode *pVnode);
void    vnodeDestroy(const char *path);
void    vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
int     vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int     vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int     vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int     vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int     vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
H
Hongze Cheng 已提交
60
int     vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
H
Hongze Cheng 已提交
61

H
Hongze Cheng 已提交
62
// meta
H
Hongze Cheng 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
typedef struct SMeta       SMeta;        // todo: remove
typedef struct SMTbCursor  SMTbCursor;   // todo: remove
typedef struct SMCtbCursor SMCtbCursor;  // todo: remove
typedef struct SMSmaCursor SMSmaCursor;  // todo: remove

#define META_SUPER_TABLE  TD_SUPER_TABLE
#define META_CHILD_TABLE  TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE

typedef SVCreateTbReq   STbCfg;
typedef SVCreateTSmaReq SSmaCfg;

SMeta          *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
void            metaClose(SMeta *pMeta);
void            metaRemove(const char *path);
int             metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
int             metaDropTable(SMeta *pMeta, tb_uid_t uid);
int             metaCommit(SMeta *pMeta);
int32_t         metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
int32_t         metaDropTSma(SMeta *pMeta, int64_t indexUid);
STbCfg         *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg         *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema       *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
void           *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper   *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray         *metaGetSmaTbUids(SMeta *pMeta, bool isDup);
int             metaGetTbNum(SMeta *pMeta);
SMTbCursor     *metaOpenTbCursor(SMeta *pMeta);
void            metaCloseTbCursor(SMTbCursor *pTbCur);
char           *metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor    *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void            metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t        metaCtbCursorNext(SMCtbCursor *pCtbCur);

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
void         metaCloseSmaCursor(SMSmaCursor *pSmaCur);
int64_t      metaSmaCursorNext(SMSmaCursor *pSmaCur);
H
Hongze Cheng 已提交
101

H
Hongze Cheng 已提交
102
// tsdb
H
Hongze Cheng 已提交
103 104 105 106 107 108 109 110 111 112
typedef struct STsdb          STsdb;
typedef struct SDataStatis    SDataStatis;
typedef struct STsdbQueryCond STsdbQueryCond;
typedef void                 *tsdbReaderT;

#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER  2
#define BLOCK_LOAD_TABLE_RR_ORDER   3
#define TABLE_TID(t)                (t)->tid
#define TABLE_UID(t)                (t)->uid
H
Hongze Cheng 已提交
113

H
Hongze Cheng 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
                             uint64_t taskId);
tsdbReaderT  tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
                                void *pMemRef);
int32_t      tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool         isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t      tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, const char *pTagCond, size_t len,
                                      int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupInfo,
                                      SColIndex *pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
int64_t      tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool         tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void         tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t      tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis);
SArray      *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void         tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t      tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t      tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
void         tsdbCleanupReadHandle(tsdbReaderT queryHandle);

// tq
H
Hongze Cheng 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
enum {
  TQ_STREAM_TOKEN__DATA = 1,
  TQ_STREAM_TOKEN__WATERMARK,
  TQ_STREAM_TOKEN__CHECKPOINT,
};

typedef struct STqReadHandle STqReadHandle;

STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);

void    tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
int     tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int     tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool    tqNextDataBlock(STqReadHandle *pHandle);
int     tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
H
Hongze Cheng 已提交
151 152 153

// need to remove
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
H
Hongze Cheng 已提交
154

H
Hongze Cheng 已提交
155 156 157 158
// structs
struct SMetaCfg {
  uint64_t lruSize;
};
H
Hongze Cheng 已提交
159

H
Hongze Cheng 已提交
160
struct STsdbCfg {
H
Hongze Cheng 已提交
161 162 163 164 165 166 167 168 169 170 171
  int8_t   precision;
  int8_t   update;
  int8_t   compression;
  int32_t  daysPerFile;
  int32_t  minRowsPerFileBlock;
  int32_t  maxRowsPerFileBlock;
  int32_t  keep;
  int32_t  keep1;
  int32_t  keep2;
  uint64_t lruCacheSize;
  SArray  *retentions;
H
Hongze Cheng 已提交
172
};
H
Hongze Cheng 已提交
173

H
Hongze Cheng 已提交
174
struct STqCfg {
L
Liu Jicong 已提交
175
  int32_t reserved;
H
Hongze Cheng 已提交
176
};
L
Liu Jicong 已提交
177

H
Hongze Cheng 已提交
178
struct SVnodeCfg {
H
Hongze Cheng 已提交
179
  int32_t  vgId;
D
dapan1121 已提交
180
  uint64_t dbId;
S
Shengliang Guan 已提交
181
  STfs    *pTfs;
H
Hongze Cheng 已提交
182 183 184 185
  uint64_t wsize;
  uint64_t ssize;
  uint64_t lsize;
  bool     isHeapAllocator;
H
more  
Hongze Cheng 已提交
186 187
  uint32_t ttl;
  uint32_t keep;
L
Liu Jicong 已提交
188
  int8_t   streamMode;
H
Hongze Cheng 已提交
189
  bool     isWeak;
H
more  
Hongze Cheng 已提交
190 191
  STsdbCfg tsdbCfg;
  SMetaCfg metaCfg;
H
Hongze Cheng 已提交
192 193
  STqCfg   tqCfg;
  SWalCfg  walCfg;
S
Shengliang Guan 已提交
194
  SMsgCb   msgCb;
D
dapan1121 已提交
195 196
  uint32_t hashBegin;
  uint32_t hashEnd;
L
Liu Jicong 已提交
197
  int8_t   hashMethod;
H
Hongze Cheng 已提交
198
};
H
save  
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200
struct STqReadHandle {
201
  int64_t           ver;
L
Liu Jicong 已提交
202
  int64_t           tbUid;
203
  SHashObj         *tbIdHash;
S
Shengliang Guan 已提交
204
  const SSubmitReq *pMsg;
205 206 207 208 209 210 211 212
  SSubmitBlk       *pBlock;
  SSubmitMsgIter    msgIter;
  SSubmitBlkIter    blkIter;
  SMeta            *pVnodeMeta;
  SArray           *pColIdList;  // SArray<int32_t>
  int32_t           sver;
  SSchemaWrapper   *pSchemaWrapper;
  STSchema         *pSchema;
H
Hongze Cheng 已提交
213
};
H
refact  
Hongze Cheng 已提交
214

H
Hongze Cheng 已提交
215 216
// ---------------------------- OLD ----------------------------
typedef struct SMgmtWrapper SMgmtWrapper;
H
refact  
Hongze Cheng 已提交
217

H
Hongze Cheng 已提交
218
// Types exported
H
refact  
Hongze Cheng 已提交
219

H
Hongze Cheng 已提交
220
struct SDataStatis {
H
Hongze Cheng 已提交
221 222 223 224 225 226 227
  int16_t colId;
  int16_t maxIndex;
  int16_t minIndex;
  int16_t numOfNull;
  int64_t sum;
  int64_t max;
  int64_t min;
H
Hongze Cheng 已提交
228
};
L
Liu Jicong 已提交
229

H
Hongze Cheng 已提交
230 231 232 233 234 235 236 237
struct STsdbQueryCond {
  STimeWindow  twindow;
  int32_t      order;  // desc|asc order to iterate the data block
  int32_t      numOfCols;
  SColumnInfo *colList;
  bool         loadExternalRows;  // load external rows or not
  int32_t      type;              // data block load type:
};
H
save  
Hongze Cheng 已提交
238

H
Hongze Cheng 已提交
239 240 241 242
typedef struct {
  TSKEY    lastKey;
  uint64_t uid;
} STableKeyInfo;
243

H
Hongze Cheng 已提交
244 245 246 247 248
typedef struct STable {
  uint64_t  tid;
  uint64_t  uid;
  STSchema *pSchema;
} STable;
249

H
refact  
Hongze Cheng 已提交
250 251
/* ------------------------ FOR COMPILE ------------------------ */

L
Liu Jicong 已提交
252 253 254 255 256 257 258 259 260 261
typedef struct {
  int8_t type;
  int8_t reserved[7];
  union {
    void   *data;
    int64_t wmTs;
    int64_t checkpointId;
  };
} STqStreamToken;

H
Hongze Cheng 已提交
262 263 264 265 266 267 268 269
// meta.h

// Options
void metaOptionsInit(SMetaCfg *pMetaCfg);
void metaOptionsClear(SMetaCfg *pMetaCfg);

// query condition to build multi-table data block iterator
// STsdb
H
Hongze Cheng 已提交
270 271
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
                STfs *pTfs);
H
Hongze Cheng 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284
void   tsdbClose(STsdb *);
void   tsdbRemove(const char *path);
int    tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int    tsdbPrepareCommit(STsdb *pTsdb);
int    tsdbCommit(STsdb *pTsdb);

int32_t tsdbInitSma(STsdb *pTsdb);
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg);
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
/**
 * @brief When submit msg received, update the relative expired window synchronously.
 *
 * @param pTsdb
285 286
 * @param pMsg
 * @param version
H
Hongze Cheng 已提交
287 288
 * @return int32_t
 */
289
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
H
Hongze Cheng 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302

/**
 * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
 *
 * @param pTsdb
 * @param indexUid
 * @param msg
 * @return int32_t
 */
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);

/**
 * @brief Drop tSma data and local cache.
H
Hongze Cheng 已提交
303 304 305 306
 *
 * @param pTsdb
 * @param indexUid
 * @return int32_t
H
Hongze Cheng 已提交
307 308 309 310 311 312 313 314 315 316 317 318
 */
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);

/**
 * @brief Insert RSma(Rollup SMA) data.
 *
 * @param pTsdb
 * @param msg
 * @return int32_t
 */
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);

S
Shengliang Guan 已提交
319 320 321 322
#ifdef __cplusplus
}
#endif

323
#endif /*_TD_VNODE_H_*/