walWrite.c 9.7 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18
#include "os.h"
S
Shengliang Guan 已提交
19
#include "taoserror.h"
20
#include "tchecksum.h"
S
TD-1895  
Shengliang Guan 已提交
21
#include "tfile.h"
S
Shengliang Guan 已提交
22
#include "walInt.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23

L
Liu Jicong 已提交
24
int32_t walCommit(SWal *pWal, int64_t ver) {
L
Liu Jicong 已提交
25 26 27
  ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
  ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
  if(ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
L
Liu Jicong 已提交
28 29
    return -1;
  }
L
Liu Jicong 已提交
30
  pWal->vers.commitVer = ver;
L
Liu Jicong 已提交
31 32 33 34 35 36
  return 0;
}

int32_t walRollback(SWal *pWal, int64_t ver) {
  int code;
  char fnameStr[WAL_FILE_LEN];
L
Liu Jicong 已提交
37
  if(ver == pWal->vers.lastVer) {
L
Liu Jicong 已提交
38 39
    return 0;
  }
L
Liu Jicong 已提交
40
  if(ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
L
Liu Jicong 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    return -1;
  }
  pthread_mutex_lock(&pWal->mutex);

  //find correct file
  if(ver < walGetLastFileFirstVer(pWal)) {
    //close current files
    tfClose(pWal->writeIdxTfd);
    tfClose(pWal->writeLogTfd);
    //open old files
    code = walChangeFile(pWal, ver);
    if(code != 0) {
      return -1;
    }

    //delete files
    int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
    for(int i = pWal->writeCur; i < fileSetSize; i++) {
      walBuildLogName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
      remove(fnameStr);
      walBuildIdxName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
      remove(fnameStr);
    }
    //pop from fileInfoSet
    taosArraySetSize(pWal->fileInfoSet, pWal->writeCur + 1);
  }

  walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
  int64_t idxTfd = tfOpenReadWrite(fnameStr);

L
Liu Jicong 已提交
71
  //TODO:change to deserialize function
L
Liu Jicong 已提交
72 73 74 75
  if(idxTfd < 0) {
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
L
Liu Jicong 已提交
76
  int64_t idxOff = walGetVerIdxOffset(pWal, ver);
L
Liu Jicong 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
  code = tfLseek(idxTfd, idxOff, SEEK_SET);
  if(code < 0) {
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  //read idx file and get log file pos
  //TODO:change to deserialize function
  WalIdxEntry entry;
  if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  ASSERT(entry.ver == ver);

  walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
  int64_t logTfd = tfOpenReadWrite(fnameStr);
  if(logTfd < 0) {
    //TODO
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  code = tfLseek(logTfd, entry.offset, SEEK_SET);
  if(code < 0) {
    //TODO
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  //validate offset
  SWalHead head;
  ASSERT(tfValid(logTfd));
  int size = tfRead(logTfd, &head, sizeof(SWalHead));
  if(size != sizeof(SWalHead)) {
    return -1;
  }
  code = walValidHeadCksum(&head);

  ASSERT(code == 0);
  if(code != 0) {
    return -1;
  }
  if(head.head.version != ver) {
    //TODO
    return -1;
  }
  //truncate old files
  code = tfFtruncate(logTfd, entry.offset);
  if(code < 0) {
    return -1;
  }
  code = tfFtruncate(idxTfd, idxOff);
  if(code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
130
  pWal->vers.lastVer = ver - 1;
L
Liu Jicong 已提交
131 132 133 134 135 136 137
  ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
  ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;

  //unlock
  pthread_mutex_unlock(&pWal->mutex);
  return 0;
}
L
Liu Jicong 已提交
138

L
Liu Jicong 已提交
139
int32_t walBeginSnapshot(SWal* pWal, int64_t ver) {
L
Liu Jicong 已提交
140
  pWal->vers.verInSnapshotting = ver;
L
Liu Jicong 已提交
141
  //check file rolling
L
Liu Jicong 已提交
142
  if(pWal->cfg.retentionPeriod == 0) {
L
Liu Jicong 已提交
143 144 145 146 147 148
    walRoll(pWal);
  }

  return 0;
}

L
Liu Jicong 已提交
149
int32_t walEndSnapshot(SWal *pWal) {
L
Liu Jicong 已提交
150
  int64_t ver = pWal->vers.verInSnapshotting;
L
Liu Jicong 已提交
151 152
  if(ver == -1) return -1;

L
Liu Jicong 已提交
153
  pWal->vers.snapshotVer = ver;
L
Liu Jicong 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166
  int ts = taosGetTimestampSec();

  int deleteCnt = 0;
  int64_t newTotSize = pWal->totSize;
  WalFileInfo tmp;
  tmp.firstVer = ver;
  //find files safe to delete
  WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
  if(ver >= pInfo->lastVer) {
    pInfo++;
  }
  //iterate files, until the searched result
  for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
L
Liu Jicong 已提交
167 168
    if(pWal->totSize > pWal->cfg.retentionSize ||
        iter->closeTs + pWal->cfg.retentionPeriod > ts) {
L
Liu Jicong 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
      //delete according to file size or close time
      deleteCnt++;
      newTotSize -= iter->fileSize;
    }
  }
  char fnameStr[WAL_FILE_LEN];
  //remove file
  for(int i = 0; i < deleteCnt; i++) {
    WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
    walBuildLogName(pWal, pInfo->firstVer, fnameStr); 
    remove(fnameStr);
    walBuildIdxName(pWal, pInfo->firstVer, fnameStr); 
    remove(fnameStr);
  }

  //make new array, remove files
  taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); 
  if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
L
Liu Jicong 已提交
187
    pWal->writeCur = -1;
L
Liu Jicong 已提交
188
    pWal->vers.firstVer = -1;
L
Liu Jicong 已提交
189
  } else {
L
Liu Jicong 已提交
190
    pWal->vers.firstVer = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
L
Liu Jicong 已提交
191
  }
L
Liu Jicong 已提交
192
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;;
L
Liu Jicong 已提交
193
  pWal->totSize = newTotSize;
L
Liu Jicong 已提交
194
  pWal->vers.verInSnapshotting = -1;
L
Liu Jicong 已提交
195 196

  //save snapshot ver, commit ver
L
Liu Jicong 已提交
197
  int code = walSaveMeta(pWal);
L
Liu Jicong 已提交
198 199 200 201 202 203 204
  if(code != 0) {
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
205
int walRoll(SWal *pWal) {
L
Liu Jicong 已提交
206
  int code = 0;
L
Liu Jicong 已提交
207 208
  if(pWal->writeIdxTfd != -1) {
    code = tfClose(pWal->writeIdxTfd);
L
Liu Jicong 已提交
209 210 211
    if(code != 0) {
      return -1;
    }
L
Liu Jicong 已提交
212
  }
L
Liu Jicong 已提交
213 214
  if(pWal->writeLogTfd != -1) {
    code = tfClose(pWal->writeLogTfd);
L
Liu Jicong 已提交
215 216 217
    if(code != 0) {
      return -1;
    }
L
Liu Jicong 已提交
218 219 220
  }
  int64_t idxTfd, logTfd;
  //create new file
L
Liu Jicong 已提交
221
  int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
L
Liu Jicong 已提交
222
  char fnameStr[WAL_FILE_LEN];
L
Liu Jicong 已提交
223
  walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
L
Liu Jicong 已提交
224
  idxTfd = tfOpenCreateWriteAppend(fnameStr);
L
Liu Jicong 已提交
225 226 227 228 229
  if(idxTfd < 0) {
    ASSERT(0);
    return -1;
  }
  walBuildLogName(pWal, newFileFirstVersion, fnameStr);
L
Liu Jicong 已提交
230
  logTfd = tfOpenCreateWriteAppend(fnameStr);
L
Liu Jicong 已提交
231 232 233 234 235 236 237 238 239
  if(logTfd < 0) {
    ASSERT(0);
    return -1;
  }
  code = walRollFileInfo(pWal);
  if(code != 0) {
    ASSERT(0);
    return -1;
  }
L
Liu Jicong 已提交
240 241

  //switch file
L
Liu Jicong 已提交
242 243
  pWal->writeIdxTfd = idxTfd;
  pWal->writeLogTfd = logTfd;
L
Liu Jicong 已提交
244
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
L
Liu Jicong 已提交
245 246 247 248 249

  pWal->lastRollSeq = walGetSeq();
  return 0;
}

L
Liu Jicong 已提交
250
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
L
Liu Jicong 已提交
251 252 253
  WalIdxEntry entry = { .ver = ver, .offset = offset };
  int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry));
  if(size != sizeof(WalIdxEntry)) {
L
Liu Jicong 已提交
254
    //TODO truncate
L
Liu Jicong 已提交
255
    return -1;
L
Liu Jicong 已提交
256 257 258 259
  }
  return 0;
}

L
Liu Jicong 已提交
260
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) {
L
Liu Jicong 已提交
261
  if (pWal == NULL) return -1;
L
Liu Jicong 已提交
262
  int code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263

S
TD-1652  
Shengliang Guan 已提交
264
  // no wal
L
Liu Jicong 已提交
265
  if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
L
Liu Jicong 已提交
266

L
Liu Jicong 已提交
267
  if (index == pWal->vers.lastVer + 1) {
L
Liu Jicong 已提交
268
    if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
L
Liu Jicong 已提交
269
      pWal->vers.firstVer = index;
L
Liu Jicong 已提交
270 271
      code = walRoll(pWal);
      ASSERT(code == 0);
L
Liu Jicong 已提交
272
    } else {
L
Liu Jicong 已提交
273
      int64_t passed = walGetSeq() - pWal->lastRollSeq;
L
Liu Jicong 已提交
274
      if(pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
L
Liu Jicong 已提交
275
        walRoll(pWal);
L
Liu Jicong 已提交
276
      } else if(pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
L
Liu Jicong 已提交
277 278
        walRoll(pWal);
      }
L
Liu Jicong 已提交
279 280 281 282 283 284
    }
  } else {
    //reject skip log or rewrite log
    //must truncate explicitly first
    return -1;
  }
L
Liu Jicong 已提交
285
  /*if (!tfValid(pWal->writeLogTfd)) return -1;*/
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
286

287
  pthread_mutex_lock(&pWal->mutex);
L
Liu Jicong 已提交
288
  pWal->writeHead.head.version = index;
L
Liu Jicong 已提交
289

L
Liu Jicong 已提交
290
  int64_t offset = walGetCurFileOffset(pWal);
L
Liu Jicong 已提交
291 292 293 294
  pWal->writeHead.head.len = bodyLen;
  pWal->writeHead.head.msgType = msgType;
  pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
  pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
295

L
Liu Jicong 已提交
296
  if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
L
Liu Jicong 已提交
297
    //ftruncate
S
TD-1846  
Shengliang Guan 已提交
298
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
299
    wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
J
Jeff Tao 已提交
300
  }
S
TD-1846  
Shengliang Guan 已提交
301

L
Liu Jicong 已提交
302
  if (tfWrite(pWal->writeLogTfd, (char*)body, bodyLen) != bodyLen) {
L
Liu Jicong 已提交
303 304
    //ftruncate
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
305
    wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
L
Liu Jicong 已提交
306
  }
L
Liu Jicong 已提交
307
  code = walWriteIndex(pWal, index, offset);
L
Liu Jicong 已提交
308 309
  if(code != 0) {
    //TODO
L
Liu Jicong 已提交
310
    return -1;
L
Liu Jicong 已提交
311
  }
312

L
Liu Jicong 已提交
313
  //set status
L
Liu Jicong 已提交
314
  pWal->vers.lastVer = index;
L
Liu Jicong 已提交
315
  pWal->totSize += sizeof(SWalHead) + bodyLen;
L
Liu Jicong 已提交
316 317
  walGetCurFileInfo(pWal)->lastVer = index;
  walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
L
Liu Jicong 已提交
318
  
L
Liu Jicong 已提交
319
  pthread_mutex_unlock(&pWal->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
320

S
TD-1846  
Shengliang Guan 已提交
321
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
322 323
}

L
Liu Jicong 已提交
324
void walFsync(SWal *pWal, bool forceFsync) {
L
Liu Jicong 已提交
325 326
  if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
    wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
L
Liu Jicong 已提交
327
    if (tfFsync(pWal->writeLogTfd) < 0) {
L
Liu Jicong 已提交
328
      wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno));
329 330
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332
}

L
Liu Jicong 已提交
333 334 335 336 337 338 339 340 341
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
  /*int code = 0;*/
  /*SWalHead *pHead = NULL;*/
  /*code = (int)walRead(pWal, &pHead, ver);*/
  /*if(pHead->head.version != ver) {*/
    /*return -1;*/
  /*}*/
  /*return 0;*/
/*}*/
L
Liu Jicong 已提交
342

L
Liu Jicong 已提交
343 344 345 346 347
/*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/
  /*int code = walSeekVer(pWal, ver);*/
  /*if(code != 0) {*/
    /*return -1;*/
  /*}*/
L
Liu Jicong 已提交
348

L
Liu Jicong 已提交
349 350 351 352
  /*code = walValidateOffset(pWal, ver);*/
  /*if(code != 0) {*/
    /*return -1;*/
  /*}*/
L
Liu Jicong 已提交
353

L
Liu Jicong 已提交
354 355
  /*return 0;*/
/*}*/