wal.h 6.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef _TD_WAL_H_
#define _TD_WAL_H_

L
Liu Jicong 已提交
18
#include "os.h"
L
Liu Jicong 已提交
19
#include "tarray.h"
L
Liu Jicong 已提交
20 21
#include "tdef.h"
#include "tlog.h"
L
Liu Jicong 已提交
22
#include "tmsg.h"
H
refact  
Hongze Cheng 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34
// clang-format off
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", DEBUG_FATAL, 255,        __VA_ARGS__); }}
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", DEBUG_ERROR, 255,        __VA_ARGS__); }}
#define wWarn(...)  { if (wDebugFlag & DEBUG_WARN)  { taosPrintLog("WAL WARN ",  DEBUG_WARN, 255,         __VA_ARGS__); }}
#define wInfo(...)  { if (wDebugFlag & DEBUG_INFO)  { taosPrintLog("WAL ",       DEBUG_INFO, 255,         __VA_ARGS__); }}
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ",       DEBUG_DEBUG, wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ",       DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
// clang-format on
L
Liu Jicong 已提交
35

L
Liu Jicong 已提交
36 37 38 39 40 41 42 43 44 45
#define WAL_PROTO_VER     0
#define WAL_NOSUFFIX_LEN  20
#define WAL_SUFFIX_AT     (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX    "log"
#define WAL_INDEX_SUFFIX  "idx"
#define WAL_REFRESH_MS    1000
#define WAL_PATH_LEN      (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN      (WAL_PATH_LEN + 32)
#define WAL_MAGIC         0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
L
Liu Jicong 已提交
46

L
Liu Jicong 已提交
47 48 49 50
typedef enum {
  TAOS_WAL_WRITE = 1,
  TAOS_WAL_FSYNC = 2,
} EWalType;
51

L
Liu Jicong 已提交
52 53 54 55 56
typedef struct {
  int32_t  vgId;
  int32_t  fsyncPeriod;      // millisecond
  int32_t  retentionPeriod;  // secs
  int32_t  rollPeriod;       // secs
L
Liu Jicong 已提交
57
  int64_t  retentionSize;
L
Liu Jicong 已提交
58
  int64_t  segSize;
L
Liu Jicong 已提交
59
  EWalType level;  // wal level
L
Liu Jicong 已提交
60 61
} SWalCfg;

L
Liu Jicong 已提交
62
typedef struct {
L
Liu Jicong 已提交
63 64 65 66
  int64_t firstVer;
  int64_t verInSnapshotting;
  int64_t snapshotVer;
  int64_t commitVer;
L
Liu Jicong 已提交
67
  int64_t appliedVer;
L
Liu Jicong 已提交
68
  int64_t lastVer;
69
  int64_t logRetention;
L
Liu Jicong 已提交
70 71
} SWalVer;

L
Liu Jicong 已提交
72 73 74 75 76 77
#pragma pack(push, 1)
// used by sync module
typedef struct {
  int8_t   isWeek;
  uint64_t seqNum;
  uint64_t term;
L
Liu Jicong 已提交
78
} SWalSyncInfo;
L
Liu Jicong 已提交
79 80 81

typedef struct {
  int64_t version;
82
  int64_t ingestTs;
L
Liu Jicong 已提交
83
  int32_t bodyLen;
84 85
  int16_t msgType;
  int8_t  protoVer;
L
Liu Jicong 已提交
86 87

  // sync meta
L
Liu Jicong 已提交
88
  SWalSyncInfo syncMeta;
L
Liu Jicong 已提交
89 90 91 92 93 94 95 96 97 98 99 100

  char body[];
} SWalCont;

typedef struct {
  uint64_t magic;
  uint32_t cksumHead;
  uint32_t cksumBody;
  SWalCont head;
} SWalCkHead;
#pragma pack(pop)

L
Liu Jicong 已提交
101
typedef struct SWal {
L
Liu Jicong 已提交
102
  // cfg
L
Liu Jicong 已提交
103
  SWalCfg cfg;
L
Liu Jicong 已提交
104
  int32_t fsyncSeq;
L
Liu Jicong 已提交
105
  // meta
L
Liu Jicong 已提交
106
  SWalVer   vers;
L
Liu Jicong 已提交
107 108
  TdFilePtr pLogFile;
  TdFilePtr pIdxFile;
L
Liu Jicong 已提交
109
  int32_t   writeCur;
L
Liu Jicong 已提交
110
  SArray   *fileInfoSet;  // SArray<SWalFileInfo>
111 112
  // gc
  SArray *toDeleteFiles;  // SArray<SWalFileInfo>
L
Liu Jicong 已提交
113
  // status
L
Liu Jicong 已提交
114 115
  int64_t totSize;
  int64_t lastRollSeq;
L
Liu Jicong 已提交
116
  // ctl
L
Liu Jicong 已提交
117
  int64_t       refId;
wafwerar's avatar
wafwerar 已提交
118
  TdThreadMutex mutex;
119
  // ref
120
  SHashObj *pRefHash;  // refId -> SWalRef
L
Liu Jicong 已提交
121
  // path
L
Liu Jicong 已提交
122
  char path[WAL_PATH_LEN];
L
Liu Jicong 已提交
123
  // reusable write head
L
Liu Jicong 已提交
124
  SWalCkHead writeHead;
125 126 127 128 129
} SWal;

typedef struct {
  int64_t refId;
  int64_t refVer;
130 131
  //  int64_t refFile;
  SWal *pWal;
132
} SWalRef;
L
Liu Jicong 已提交
133

L
Liu Jicong 已提交
134 135
typedef struct {
  int8_t scanUncommited;
136
  int8_t scanNotApplied;
L
Liu Jicong 已提交
137
  int8_t scanMeta;
138
  int8_t deleteMsg;
L
Liu Jicong 已提交
139
  int8_t enableRef;
L
Liu Jicong 已提交
140 141
} SWalFilterCond;

142 143
typedef struct SWalReader SWalReader;

144
// todo hide this struct
145
struct SWalReader {
L
Liu Jicong 已提交
146
  SWal          *pWal;
147
  int64_t        readerId;
L
Liu Jicong 已提交
148 149 150 151
  TdFilePtr      pLogFile;
  TdFilePtr      pIdxFile;
  int64_t        curFileFirstVer;
  int64_t        curVersion;
152
  int64_t        skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data
L
Liu Jicong 已提交
153 154 155
  int64_t        capacity;
  TdThreadMutex  mutex;
  SWalFilterCond cond;
156 157
  // TODO remove it
  SWalCkHead *pHead;
158
};
159

S
Shengliang Guan 已提交
160 161 162
// module initialization
int32_t walInit();
void    walCleanUp();
163

S
Shengliang Guan 已提交
164
// handle open and ctl
L
Liu Jicong 已提交
165
SWal   *walOpen(const char *path, SWalCfg *pCfg);
S
Shengliang Guan 已提交
166
int32_t walAlter(SWal *, SWalCfg *pCfg);
167
int32_t walPersist(SWal *);
S
Shengliang Guan 已提交
168
void    walClose(SWal *);
169

L
Liu Jicong 已提交
170 171 172
// write interfaces

// By assigning index by the caller, wal gurantees linearizability
L
Liu Jicong 已提交
173
int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
L
Liu Jicong 已提交
174 175 176
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
                             int32_t bodyLen);

177
// Assign version automatically and return to caller,
L
Liu Jicong 已提交
178
// -1 will be returned for failed writes
B
Benguang Zhao 已提交
179
int64_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
L
Liu Jicong 已提交
180 181

void walFsync(SWal *, bool force);
L
Liu Jicong 已提交
182

S
Shengliang Guan 已提交
183 184 185
// apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver);
int32_t walRollback(SWal *, int64_t ver);
L
Liu Jicong 已提交
186
// notify that previous logs can be pruned safely
187
int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
L
Liu Jicong 已提交
188
int32_t walEndSnapshot(SWal *);
189
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
L
Liu Jicong 已提交
190 191 192
// for tq
int32_t walApplyVer(SWal *, int64_t ver);

L
Liu Jicong 已提交
193
// int32_t  walDataCorrupted(SWal*);
194

195
// wal reader
L
Liu Jicong 已提交
196 197
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void        walCloseReader(SWalReader *pRead);
198
void        walReadReset(SWalReader *pReader);
L
Liu Jicong 已提交
199
int32_t     walReadVer(SWalReader *pRead, int64_t ver);
200
int32_t     walReaderSeekVer(SWalReader *pRead, int64_t ver);
L
Liu Jicong 已提交
201
int32_t     walNextValidMsg(SWalReader *pRead);
202 203
int64_t     walReaderGetCurrentVer(const SWalReader *pReader);
int64_t     walReaderGetValidFirstVer(const SWalReader *pReader);
204 205
int64_t     walReaderGetSkipToVersion(SWalReader *pReader);
void        walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver);
206
void        walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
207
void        walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
208 209

// only for tq usage
L
Liu Jicong 已提交
210 211 212 213
void    walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
214

L
Liu Jicong 已提交
215
SWalRef *walRefFirstVer(SWal *, SWalRef *);
216
SWalRef *walRefCommittedVer(SWal *);
217 218

SWalRef *walOpenRef(SWal *);
219
void     walCloseRef(SWal *pWal, int64_t refId);
wmmhello's avatar
wmmhello 已提交
220
int32_t  walSetRefVer(SWalRef *, int64_t ver);
221

222
// helper function for raft
223
bool walLogExist(SWal *, int64_t ver);
224
bool walIsEmpty(SWal *);
225

S
Shengliang Guan 已提交
226
// lifecycle check
L
Liu Jicong 已提交
227 228 229
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
L
Liu Jicong 已提交
230
int64_t walGetCommittedVer(SWal *);
L
Liu Jicong 已提交
231
int64_t walGetAppliedVer(SWal *);
232

H
refact  
Hongze Cheng 已提交
233 234 235 236
#ifdef __cplusplus
}
#endif

237
#endif  // _TD_WAL_H_