wal.h 6.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef _TD_WAL_H_
#define _TD_WAL_H_

L
Liu Jicong 已提交
18
#include "os.h"
L
Liu Jicong 已提交
19
#include "tarray.h"
L
Liu Jicong 已提交
20 21
#include "tdef.h"
#include "tlog.h"
L
Liu Jicong 已提交
22
#include "tmsg.h"
H
refact  
Hongze Cheng 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34
// clang-format off
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", DEBUG_FATAL, 255,        __VA_ARGS__); }}
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", DEBUG_ERROR, 255,        __VA_ARGS__); }}
#define wWarn(...)  { if (wDebugFlag & DEBUG_WARN)  { taosPrintLog("WAL WARN ",  DEBUG_WARN, 255,         __VA_ARGS__); }}
#define wInfo(...)  { if (wDebugFlag & DEBUG_INFO)  { taosPrintLog("WAL ",       DEBUG_INFO, 255,         __VA_ARGS__); }}
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ",       DEBUG_DEBUG, wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ",       DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
// clang-format on
L
Liu Jicong 已提交
35

L
Liu Jicong 已提交
36 37 38 39 40 41 42 43 44 45
#define WAL_PROTO_VER     0
#define WAL_NOSUFFIX_LEN  20
#define WAL_SUFFIX_AT     (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX    "log"
#define WAL_INDEX_SUFFIX  "idx"
#define WAL_REFRESH_MS    1000
#define WAL_PATH_LEN      (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN      (WAL_PATH_LEN + 32)
#define WAL_MAGIC         0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
L
Liu Jicong 已提交
46

L
Liu Jicong 已提交
47 48 49 50
typedef enum {
  TAOS_WAL_WRITE = 1,
  TAOS_WAL_FSYNC = 2,
} EWalType;
51

L
Liu Jicong 已提交
52 53 54 55 56
typedef struct {
  int32_t  vgId;
  int32_t  fsyncPeriod;      // millisecond
  int32_t  retentionPeriod;  // secs
  int32_t  rollPeriod;       // secs
L
Liu Jicong 已提交
57
  int64_t  retentionSize;
L
Liu Jicong 已提交
58
  int64_t  segSize;
L
Liu Jicong 已提交
59
  EWalType level;  // wal level
L
Liu Jicong 已提交
60 61
} SWalCfg;

L
Liu Jicong 已提交
62
typedef struct {
L
Liu Jicong 已提交
63 64 65 66
  int64_t firstVer;
  int64_t verInSnapshotting;
  int64_t snapshotVer;
  int64_t commitVer;
L
Liu Jicong 已提交
67
  int64_t appliedVer;
L
Liu Jicong 已提交
68
  int64_t lastVer;
69
  int64_t logRetention;
L
Liu Jicong 已提交
70 71
} SWalVer;

L
Liu Jicong 已提交
72 73 74 75 76 77
#pragma pack(push, 1)
// used by sync module
typedef struct {
  int8_t   isWeek;
  uint64_t seqNum;
  uint64_t term;
L
Liu Jicong 已提交
78
} SWalSyncInfo;
L
Liu Jicong 已提交
79 80 81

typedef struct {
  int64_t version;
82
  int64_t ingestTs;
L
Liu Jicong 已提交
83
  int32_t bodyLen;
84 85
  int16_t msgType;
  int8_t  protoVer;
L
Liu Jicong 已提交
86 87

  // sync meta
L
Liu Jicong 已提交
88
  SWalSyncInfo syncMeta;
L
Liu Jicong 已提交
89 90 91 92 93 94 95 96 97 98 99 100

  char body[];
} SWalCont;

typedef struct {
  uint64_t magic;
  uint32_t cksumHead;
  uint32_t cksumBody;
  SWalCont head;
} SWalCkHead;
#pragma pack(pop)

L
Liu Jicong 已提交
101
typedef struct SWal {
L
Liu Jicong 已提交
102
  // cfg
L
Liu Jicong 已提交
103
  SWalCfg cfg;
L
Liu Jicong 已提交
104
  int32_t fsyncSeq;
L
Liu Jicong 已提交
105
  // meta
L
Liu Jicong 已提交
106
  SWalVer   vers;
L
Liu Jicong 已提交
107 108
  TdFilePtr pLogFile;
  TdFilePtr pIdxFile;
L
Liu Jicong 已提交
109
  int32_t   writeCur;
L
Liu Jicong 已提交
110
  SArray   *fileInfoSet;  // SArray<SWalFileInfo>
111 112
  // gc
  SArray *toDeleteFiles;  // SArray<SWalFileInfo>
L
Liu Jicong 已提交
113
  // status
L
Liu Jicong 已提交
114 115
  int64_t totSize;
  int64_t lastRollSeq;
L
Liu Jicong 已提交
116
  // ctl
L
Liu Jicong 已提交
117
  int64_t       refId;
wafwerar's avatar
wafwerar 已提交
118
  TdThreadMutex mutex;
119
  // ref
120
  SHashObj *pRefHash;  // refId -> SWalRef
L
Liu Jicong 已提交
121
  // path
L
Liu Jicong 已提交
122
  char path[WAL_PATH_LEN];
L
Liu Jicong 已提交
123
  // reusable write head
L
Liu Jicong 已提交
124
  SWalCkHead writeHead;
125 126 127 128 129
} SWal;

typedef struct {
  int64_t refId;
  int64_t refVer;
wmmhello's avatar
wmmhello 已提交
130
//  int64_t refFile;
131 132
  SWal   *pWal;
} SWalRef;
L
Liu Jicong 已提交
133

L
Liu Jicong 已提交
134 135
typedef struct {
  int8_t scanUncommited;
136
  int8_t scanNotApplied;
L
Liu Jicong 已提交
137
  int8_t scanMeta;
L
Liu Jicong 已提交
138
  int8_t enableRef;
L
Liu Jicong 已提交
139 140
} SWalFilterCond;

141
// todo hide this struct
142
typedef struct SWalReader {
L
Liu Jicong 已提交
143
  SWal          *pWal;
144
  int64_t        readerId;
L
Liu Jicong 已提交
145 146 147 148 149
  TdFilePtr      pLogFile;
  TdFilePtr      pIdxFile;
  int64_t        curFileFirstVer;
  int64_t        curVersion;
  int64_t        capacity;
150 151
//  int8_t         curInvalid;
//  int8_t         curStopped;
L
Liu Jicong 已提交
152 153
  TdThreadMutex  mutex;
  SWalFilterCond cond;
154 155
  // TODO remove it
  SWalCkHead *pHead;
L
Liu Jicong 已提交
156
} SWalReader;
157

S
Shengliang Guan 已提交
158 159 160
// module initialization
int32_t walInit();
void    walCleanUp();
161

S
Shengliang Guan 已提交
162
// handle open and ctl
L
Liu Jicong 已提交
163
SWal   *walOpen(const char *path, SWalCfg *pCfg);
S
Shengliang Guan 已提交
164
int32_t walAlter(SWal *, SWalCfg *pCfg);
165
int32_t walPersist(SWal *);
S
Shengliang Guan 已提交
166
void    walClose(SWal *);
167

L
Liu Jicong 已提交
168 169 170
// write interfaces

// By assigning index by the caller, wal gurantees linearizability
L
Liu Jicong 已提交
171
int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
L
Liu Jicong 已提交
172 173 174
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
                             int32_t bodyLen);

175
// Assign version automatically and return to caller,
L
Liu Jicong 已提交
176
// -1 will be returned for failed writes
B
Benguang Zhao 已提交
177
int64_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
L
Liu Jicong 已提交
178 179

void walFsync(SWal *, bool force);
L
Liu Jicong 已提交
180

S
Shengliang Guan 已提交
181 182 183
// apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver);
int32_t walRollback(SWal *, int64_t ver);
L
Liu Jicong 已提交
184
// notify that previous logs can be pruned safely
185
int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
L
Liu Jicong 已提交
186
int32_t walEndSnapshot(SWal *);
187
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
L
Liu Jicong 已提交
188 189 190
// for tq
int32_t walApplyVer(SWal *, int64_t ver);

L
Liu Jicong 已提交
191
// int32_t  walDataCorrupted(SWal*);
192

S
Shengliang Guan 已提交
193
// read
L
Liu Jicong 已提交
194 195
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void        walCloseReader(SWalReader *pRead);
196
void        walReadReset(SWalReader *pReader);
L
Liu Jicong 已提交
197
int32_t     walReadVer(SWalReader *pRead, int64_t ver);
L
Liu Jicong 已提交
198
int32_t     walReadSeekVer(SWalReader *pRead, int64_t ver);
L
Liu Jicong 已提交
199
int32_t     walNextValidMsg(SWalReader *pRead);
200
int64_t     walReaderGetCurrentVer(const SWalReader* pReader);
201 202

// only for tq usage
L
Liu Jicong 已提交
203 204 205 206
void    walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
207

L
Liu Jicong 已提交
208
SWalRef *walRefFirstVer(SWal *, SWalRef *);
209
SWalRef *walRefCommittedVer(SWal *);
210 211

SWalRef *walOpenRef(SWal *);
212
void     walCloseRef(SWal *pWal, int64_t refId);
213
int32_t  walRefVer(SWalRef *, int64_t ver);
214
void     walUnrefVer(SWalRef *);
215

216
// helper function for raft
217
bool walLogExist(SWal *, int64_t ver);
218
bool walIsEmpty(SWal *);
219

S
Shengliang Guan 已提交
220
// lifecycle check
L
Liu Jicong 已提交
221 222 223
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
L
Liu Jicong 已提交
224
int64_t walGetCommittedVer(SWal *);
L
Liu Jicong 已提交
225
int64_t walGetAppliedVer(SWal *);
226

H
refact  
Hongze Cheng 已提交
227 228 229 230
#ifdef __cplusplus
}
#endif

231
#endif  // _TD_WAL_H_