vnodeInt.h 10.6 KB
Newer Older
H
save  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_VNODE_DEF_H_
#define _TD_VNODE_DEF_H_
H
save  
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19
#include "executor.h"
H
Hongze Cheng 已提交
20 21 22
#include "filter.h"
#include "qworker.h"
#include "sync.h"
H
Hongze Cheng 已提交
23
#include "tchecksum.h"
H
more  
Hongze Cheng 已提交
24
#include "tcoding.h"
H
Hongze Cheng 已提交
25
#include "tcompare.h"
H
Hongze Cheng 已提交
26
#include "tcompression.h"
L
Liu Jicong 已提交
27
#include "tdatablock.h"
H
Hongze Cheng 已提交
28
#include "tdb.h"
H
Hongze Cheng 已提交
29
#include "tencode.h"
30
#include "tref.h"
H
refact  
Hongze Cheng 已提交
31
#include "tfs.h"
H
Hongze Cheng 已提交
32
#include "tglobal.h"
H
Hongze Cheng 已提交
33
#include "tjson.h"
H
Hongze Cheng 已提交
34
#include "tlist.h"
H
refact  
Hongze Cheng 已提交
35
#include "tlockfree.h"
H
Hongze Cheng 已提交
36
#include "tlosertree.h"
H
Hongze Cheng 已提交
37
#include "tmallocator.h"
38
#include "tmsgcb.h"
H
Hongze Cheng 已提交
39
#include "tskiplist.h"
H
Hongze Cheng 已提交
40
#include "tstream.h"
H
Hongze Cheng 已提交
41
#include "ttime.h"
H
Hongze Cheng 已提交
42 43
#include "ttimer.h"
#include "wal.h"
H
save  
Hongze Cheng 已提交
44

H
Hongze Cheng 已提交
45 46
#include "vnode.h"

H
save  
Hongze Cheng 已提交
47 48 49 50
#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
51 52 53 54 55 56 57 58 59 60 61
typedef struct SVnodeInfo          SVnodeInfo;
typedef struct SMeta               SMeta;
typedef struct SSma                SSma;
typedef struct STsdb               STsdb;
typedef struct STQ                 STQ;
typedef struct SVState             SVState;
typedef struct SVBufPool           SVBufPool;
typedef struct SQWorker            SQHandle;
typedef struct STsdbKeepCfg        STsdbKeepCfg;
typedef struct SMetaSnapshotReader SMetaSnapshotReader;
typedef struct STsdbSnapshotReader STsdbSnapshotReader;
H
refact  
Hongze Cheng 已提交
62

C
Cary Xu 已提交
63 64 65 66 67
#define VNODE_META_DIR  "meta"
#define VNODE_TSDB_DIR  "tsdb"
#define VNODE_TQ_DIR    "tq"
#define VNODE_WAL_DIR   "wal"
#define VNODE_TSMA_DIR  "tsma"
C
Cary Xu 已提交
68
#define VNODE_RSMA_DIR  "rsma"
C
Cary Xu 已提交
69
#define VNODE_RSMA0_DIR "tsdb"
C
Cary Xu 已提交
70 71
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
H
Hongze Cheng 已提交
72

H
Hongze Cheng 已提交
73
// vnd.h
H
Hongze Cheng 已提交
74 75 76 77
void*   vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void    vnodeBufPoolFree(SVBufPool* pPool, void* p);
int32_t vnodeRealloc(void** pp, int32_t size);
void    vnodeFree(void* p);
H
Hongze Cheng 已提交
78 79 80

// meta
typedef struct SMCtbCursor SMCtbCursor;
C
Cary Xu 已提交
81
typedef struct SMStbCursor SMStbCursor;
H
Hongze Cheng 已提交
82 83 84 85 86 87 88
typedef struct STbUidStore STbUidStore;

int             metaOpen(SVnode* pVnode, SMeta** ppMeta);
int             metaClose(SMeta* pMeta);
int             metaBegin(SMeta* pMeta);
int             metaCommit(SMeta* pMeta);
int             metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
H
Hongze Cheng 已提交
89
int             metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
H
Hongze Cheng 已提交
90
int             metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
H
Hongze Cheng 已提交
91
int             metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
92
int             metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
L
Liu Jicong 已提交
93
int             metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
H
more  
Hongze Cheng 已提交
94
int             metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
H
Hongze Cheng 已提交
95 96 97
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema*       metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int             metaGetTableEntryByName(SMetaReader* pReader, const char* name);
H
Hongze Cheng 已提交
98
tb_uid_t        metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
H
Hongze Cheng 已提交
99 100
int             metaGetTbNum(SMeta* pMeta);
SMCtbCursor*    metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
C
Cary Xu 已提交
101
void            metaCloseCtbCursor(SMCtbCursor* pCtbCur);
H
Hongze Cheng 已提交
102
tb_uid_t        metaCtbCursorNext(SMCtbCursor* pCtbCur);
C
Cary Xu 已提交
103 104 105
SMStbCursor*    metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
void            metaCloseStbCursor(SMStbCursor* pStbCur);
tb_uid_t        metaStbCursorNext(SMStbCursor* pStbCur);
C
Cary Xu 已提交
106 107 108 109
STSma*          metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper*   metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
SArray*         metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray*         metaGetSmaTbUids(SMeta* pMeta);
H
Hongze Cheng 已提交
110 111 112
int32_t         metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t         metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
int32_t         metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
dengyihao's avatar
dengyihao 已提交
113
void*           metaGetIdx(SMeta* pMeta);
dengyihao's avatar
dengyihao 已提交
114
void*           metaGetIvtIdx(SMeta* pMeta);
L
Liu Jicong 已提交
115
int             metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
C
Cary Xu 已提交
116

117 118
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
H
Hongze Cheng 已提交
119 120

// tsdb
L
Liu Jicong 已提交
121 122 123 124 125 126 127 128 129
int         tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
int         tsdbClose(STsdb** pTsdb);
int32_t     tsdbBegin(STsdb* pTsdb);
int32_t     tsdbCommit(STsdb* pTsdb);
int         tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int         tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int32_t     tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
                                SSubmitBlkRsp* pRsp);
int32_t     tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
wmmhello's avatar
wmmhello 已提交
130
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
L
Liu Jicong 已提交
131 132 133 134 135 136
                           uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
                                void* pMemRef);
int32_t     tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t     tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
int32_t     tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
H
Hongze Cheng 已提交
137 138

// tq
L
Liu Jicong 已提交
139 140
int     tqInit();
void    tqCleanUp();
H
Hongze Cheng 已提交
141 142 143 144
STQ*    tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void    tqClose(STQ*);
int     tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int     tqCommit(STQ*);
145
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
H
Hongze Cheng 已提交
146
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
L
Liu Jicong 已提交
147
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
148
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
L
Liu Jicong 已提交
149
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
L
Liu Jicong 已提交
150
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
L
Liu Jicong 已提交
151
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
L
Liu Jicong 已提交
152
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
L
Liu Jicong 已提交
153 154 155 156 157
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
158 159
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
H
Hongze Cheng 已提交
160

C
Cary Xu 已提交
161 162 163
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
                            const char* stbFullName, int32_t vgId);

C
Cary Xu 已提交
164
// sma
165
int32_t smaOpen(SVnode* pVnode);
C
Cary Xu 已提交
166 167
int32_t smaCloseEnv(SSma* pSma);
int32_t smaCloseEx(SSma* pSma);
C
Cary Xu 已提交
168
int32_t smaBegin(SSma* pSma);
C
Cary Xu 已提交
169 170 171
int32_t smaPreCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma);
C
Cary Xu 已提交
172

C
Cary Xu 已提交
173
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
174 175
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);

L
Liu Jicong 已提交
176
int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq);
177 178 179 180 181 182
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
void    tdUidStoreDestory(STbUidStore* pStore);
void*   tdUidStoreFree(STbUidStore* pStore);

L
Liu Jicong 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
typedef struct {
  int8_t  streamType;  // sma or other
  int8_t  dstType;
  int16_t padding;
  int32_t smaId;
  int64_t tbUid;
  int64_t lastReceivedVer;
  int64_t lastCommittedVer;
} SStreamSinkInfo;

typedef struct {
  SVnode*   pVnode;
  SHashObj* pHash;  // streamId -> SStreamSinkInfo
} SSink;

H
refact  
Hongze Cheng 已提交
198 199
// SVState
struct SVState {
H
Hongze Cheng 已提交
200
  // int64_t processed;
H
refact  
Hongze Cheng 已提交
201 202 203 204
  int64_t committed;
  int64_t applied;
};

H
Hongze Cheng 已提交
205 206 207 208 209
struct SVnodeInfo {
  SVnodeCfg config;
  SVState   state;
};

C
Cary Xu 已提交
210 211 212 213 214 215 216 217
typedef enum {
  TSDB_TYPE_TSDB = 0,     // TSDB
  TSDB_TYPE_TSMA = 1,     // TSMA
  TSDB_TYPE_RSMA_L0 = 2,  // RSMA Level 0
  TSDB_TYPE_RSMA_L1 = 3,  // RSMA Level 1
  TSDB_TYPE_RSMA_L2 = 4,  // RSMA Level 2
} ETsdbType;

L
Liu Jicong 已提交
218
struct STsdbKeepCfg {
C
Cary Xu 已提交
219 220 221 222 223
  int8_t  precision;  // precision always be used with below keep cfgs
  int32_t days;
  int32_t keep0;
  int32_t keep1;
  int32_t keep2;
224
};
C
Cary Xu 已提交
225

H
save  
Hongze Cheng 已提交
226
struct SVnode {
H
refact  
Hongze Cheng 已提交
227 228 229
  char*      path;
  SVnodeCfg  config;
  SVState    state;
H
Hongze Cheng 已提交
230 231
  STfs*      pTfs;
  SMsgCb     msgCb;
H
Hongze Cheng 已提交
232 233 234 235
  SVBufPool* pPool;
  SVBufPool* inUse;
  SVBufPool* onCommit;
  SVBufPool* onRecycle;
H
refact  
Hongze Cheng 已提交
236
  SMeta*     pMeta;
237
  SSma*      pSma;
H
refact  
Hongze Cheng 已提交
238 239
  STsdb*     pTsdb;
  SWal*      pWal;
L
Liu Jicong 已提交
240 241
  STQ*       pTq;
  SSink*     pSink;
H
refact  
Hongze Cheng 已提交
242
  tsem_t     canCommit;
243 244
  int64_t    sync;
  int32_t    syncCount;
wafwerar's avatar
wafwerar 已提交
245
  tsem_t     syncSem;
H
refact  
Hongze Cheng 已提交
246
  SQHandle*  pQuery;
H
save  
Hongze Cheng 已提交
247 248
};

C
Cary Xu 已提交
249
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
250

C
Cary Xu 已提交
251 252
#define VND_TSDB(vnd)       ((vnd)->pTsdb)
#define VND_RSMA0(vnd)      ((vnd)->pTsdb)
253 254
#define VND_RSMA1(vnd)      ((vnd)->pSma->pRSmaTsdb1)
#define VND_RSMA2(vnd)      ((vnd)->pSma->pRSmaTsdb2)
C
Cary Xu 已提交
255
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
C
Cary Xu 已提交
256 257
#define VND_IS_RSMA(v)      ((v)->config.isRsma == 1)
#define VND_IS_TSMA(v)      ((v)->config.isTsma == 1)
C
Cary Xu 已提交
258

H
Hongze Cheng 已提交
259 260 261 262 263 264
struct STbUidStore {
  tb_uid_t  suid;
  SArray*   tbUids;
  SHashObj* uidHash;
};

265 266 267 268 269 270 271 272 273 274 275 276
struct SSma {
  bool          locked;
  TdThreadMutex mutex;
  SVnode*       pVnode;
  STsdb*        pRSmaTsdb1;
  STsdb*        pRSmaTsdb2;
  void*         pTSmaEnv;
  void*         pRSmaEnv;
};

#define SMA_CFG(s)        (&(s)->pVnode->config)
#define SMA_TSDB_CFG(s)   (&(s)->pVnode->config.tsdbCfg)
L
Liu Jicong 已提交
277
#define SMA_RETENTION(s)  ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions)
278 279 280 281 282 283 284 285 286
#define SMA_LOCKED(s)     ((s)->locked)
#define SMA_META(s)       ((s)->pVnode->pMeta)
#define SMA_VID(s)        TD_VID((s)->pVnode)
#define SMA_TFS(s)        ((s)->pVnode->pTfs)
#define SMA_TSMA_ENV(s)   ((s)->pTSmaEnv)
#define SMA_RSMA_ENV(s)   ((s)->pRSmaEnv)
#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb)
#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb1)
#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb2)
H
refact  
Hongze Cheng 已提交
287

L
Liu Jicong 已提交
288
// sma
L
Liu Jicong 已提交
289
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
290

H
save  
Hongze Cheng 已提交
291 292 293 294
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
295
#endif /*_TD_VNODE_DEF_H_*/