vnodeFile.c 66.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
H
hzcheng 已提交
18 19 20 21 22 23

#include "tscompression.h"
#include "tutil.h"
#include "vnode.h"
#include "vnodeFile.h"
#include "vnodeUtil.h"
H
hjxilinx 已提交
24
#include "vnodeStatus.h"
H
hzcheng 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60

#define FILE_QUERY_NEW_BLOCK -5  // a special negative number

const int16_t vnodeFileVersion = 0;

int (*pCompFunc[])(const char *const input, int inputSize, const int elements, char *const output, int outputSize,
                   char algorithm, char *const buffer, int bufferSize) = {NULL,
                                          tsCompressBool,
                                          tsCompressTinyint,
                                          tsCompressSmallint,
                                          tsCompressInt,
                                          tsCompressBigint,
                                          tsCompressFloat,
                                          tsCompressDouble,
                                          tsCompressString,
                                          tsCompressTimestamp,
                                          tsCompressString};

int (*pDecompFunc[])(const char *const input, int compressedSize, const int elements, char *const output,
                     int outputSize, char algorithm, char *const buffer, int bufferSize) = {NULL,
                                                            tsDecompressBool,
                                                            tsDecompressTinyint,
                                                            tsDecompressSmallint,
                                                            tsDecompressInt,
                                                            tsDecompressBigint,
                                                            tsDecompressFloat,
                                                            tsDecompressDouble,
                                                            tsDecompressString,
                                                            tsDecompressTimestamp,
                                                            tsDecompressString};

int vnodeUpdateFileMagic(int vnode, int fileId);
int vnodeRecoverCompHeader(int vnode, int fileId);
int vnodeRecoverHeadFile(int vnode, int fileId);
int vnodeRecoverDataFile(int vnode, int fileId);
int vnodeForwardStartPosition(SQuery *pQuery, SCompBlock *pBlock, int32_t slotIdx, SVnodeObj *pVnode, SMeterObj *pObj);
S
slguan 已提交
61
int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode);
S
slguan 已提交
62 63 64 65 66 67
char* vnodeGetDataDir(int vnode, int fileId);
char* vnodeGetDiskFromHeadFile(char *headName);
void vnodeAdustVnodeFile(SVnodeObj *pVnode);
int vnodeSyncRetrieveFile(int vnode, int fd, uint32_t peerFid, uint64_t *fmagic);
int vnodeSyncRestoreFile(int vnode, int sfd);
void vnodeAdjustFileTier(int vnode);
H
hzcheng 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98

void vnodeGetHeadDataLname(char *headName, char *dataName, char *lastName, int vnode, int fileId) {
  if (headName != NULL) sprintf(headName, "%s/vnode%d/db/v%df%d.head", tsDirectory, vnode, vnode, fileId);
  if (dataName != NULL) sprintf(dataName, "%s/vnode%d/db/v%df%d.data", tsDirectory, vnode, vnode, fileId);
  if (lastName != NULL) sprintf(lastName, "%s/vnode%d/db/v%df%d.last", tsDirectory, vnode, vnode, fileId);
}

void vnodeGetHeadDataDname(char *dHeadName, char *dDataName, char *dLastName, int vnode, int fileId, char *path) {
  if (dHeadName != NULL) sprintf(dHeadName, "%s/data/vnode%d/v%df%d.head0", path, vnode, vnode, fileId);
  if (dDataName != NULL) sprintf(dDataName, "%s/data/vnode%d/v%df%d.data", path, vnode, vnode, fileId);
  if (dLastName != NULL) sprintf(dLastName, "%s/data/vnode%d/v%df%d.last0", path, vnode, vnode, fileId);
}

void vnodeGetDnameFromLname(char *lhead, char *ldata, char *llast, char *dhead, char *ddata, char *dlast) {
  if (lhead != NULL) {
    assert(dhead != NULL);
    readlink(lhead, dhead, TSDB_FILENAME_LEN);
  }

  if (ldata != NULL) {
    assert(ddata != NULL);
    readlink(ldata, ddata, TSDB_FILENAME_LEN);
  }

  if (llast != NULL) {
    assert(dlast != NULL);
    readlink(llast, dlast, TSDB_FILENAME_LEN);
  }
}

void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId) {
99 100
  if (nHeadName != NULL) sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId);
  if (nLastName != NULL) sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId);
H
hzcheng 已提交
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
}

void vnodeCreateDataDirIfNeeded(int vnode, char *path) {
  char directory[TSDB_FILENAME_LEN] = "\0";

  sprintf(directory, "%s/data/vnode%d", path, vnode);

  if (access(directory, F_OK) != 0) mkdir(directory, 0755);
}

int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataName, char *lastName) {
  char dHeadName[TSDB_FILENAME_LEN];
  char dDataName[TSDB_FILENAME_LEN];
  char dLastName[TSDB_FILENAME_LEN];

S
slguan 已提交
116 117
  char *path = vnodeGetDataDir(vnode, fileId);
  if (path == NULL) {
S
slguan 已提交
118
    dError("vid:%d, fileId:%d, failed to get dataDir", vnode, fileId);
S
slguan 已提交
119 120 121 122
    return -1;
  }
  
  vnodeCreateDataDirIfNeeded(vnode, path);
H
hzcheng 已提交
123 124

  vnodeGetHeadDataLname(headName, dataName, lastName, vnode, fileId);
S
slguan 已提交
125
  vnodeGetHeadDataDname(dHeadName, dDataName, dLastName, vnode, fileId, path);
H
hzcheng 已提交
126 127 128 129
  if (symlink(dHeadName, headName) != 0) return -1;
  if (symlink(dDataName, dataName) != 0) return -1;
  if (symlink(dLastName, lastName) != 0) return -1;

S
slguan 已提交
130 131
  dPrint("vid:%d, fileId:%d, empty header file:%s dataFile:%s lastFile:%s on disk:%s is created ",
          vnode, fileId, headName, dataName, lastName, path);
H
hzcheng 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160

  return 0;
}

int vnodeCreateEmptyCompFile(int vnode, int fileId) {
  char  headName[TSDB_FILENAME_LEN];
  char  dataName[TSDB_FILENAME_LEN];
  char  lastName[TSDB_FILENAME_LEN];
  int   tfd;
  char *temp;

  if (vnodeCreateHeadDataFile(vnode, fileId, headName, dataName, lastName) < 0) {
    dError("failed to create head data file, vnode: %d, fileId: %d", vnode, fileId);
    return -1;
  }

  tfd = open(headName, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
  if (tfd < 0) {
    dError("failed to create head file:%s, reason:%s", headName, strerror(errno));
    return -1;
  }

  vnodeCreateFileHeaderFd(tfd);
  int size = sizeof(SCompHeader) * vnodeList[vnode].cfg.maxSessions + sizeof(TSCKSUM);
  temp = malloc(size);
  memset(temp, 0, size);
  taosCalcChecksumAppend(0, (uint8_t *)temp, size);

  lseek(tfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
S
slguan 已提交
161
  twrite(tfd, temp, size);
H
hzcheng 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  free(temp);
  close(tfd);

  tfd = open(dataName, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
  if (tfd < 0) {
    dError("failed to create data file:%s, reason:%s", dataName, strerror(errno));
    return -1;
  }
  vnodeCreateFileHeaderFd(tfd);
  close(tfd);

  tfd = open(lastName, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
  if (tfd < 0) {
    dError("failed to create last file:%s, reason:%s", lastName, strerror(errno));
    return -1;
  }
  vnodeCreateFileHeaderFd(tfd);
  close(tfd);

  return 0;
}

H
Hongze Cheng 已提交
184 185 186 187
int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) {
  int        numOfFiles = 0, fileId, filesAdded = 0;
  int        vnode = pVnode->vnode;
  SVnodeCfg *pCfg = &(pVnode->cfg);
H
hzcheng 已提交
188 189 190

  if (pVnode->lastKeyOnFile == 0) {
    if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10;
S
slguan 已提交
191 192
    pVnode->fileId = pVnode->firstKey / tsMsPerDay[(uint8_t)pVnode->cfg.precision] / pCfg->daysPerFile;
    pVnode->lastKeyOnFile = (int64_t)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[(uint8_t)pVnode->cfg.precision] - 1;
H
hzcheng 已提交
193
    pVnode->numOfFiles = 1;
H
Hongze Cheng 已提交
194
    if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1;
H
hzcheng 已提交
195 196
  }

S
slguan 已提交
197
  numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[(uint8_t)pVnode->cfg.precision] / pCfg->daysPerFile;
H
hzcheng 已提交
198 199
  if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1;

L
lihui 已提交
200
  dTrace("vid:%d, commitFirstKey:%" PRId64 " lastKeyOnFile:%" PRId64 " numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode,
H
Hongze Cheng 已提交
201
         pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles);
H
hzcheng 已提交
202 203 204 205

  if (numOfFiles >= pVnode->numOfFiles) {
    // create empty header files backward
    filesAdded = numOfFiles - pVnode->numOfFiles + 1;
H
Hongze Cheng 已提交
206
    assert(filesAdded <= pVnode->maxFiles + 2);
H
hzcheng 已提交
207 208
    for (int i = 0; i < filesAdded; ++i) {
      fileId = pVnode->fileId - pVnode->numOfFiles - i;
S
slguan 已提交
209 210 211 212
      if (vnodeCreateEmptyCompFile(vnode, fileId) < 0) 
#ifdef CLUSTER	  
	    return vnodeRecoverFromPeer(pVnode, fileId);
#else
H
Hongze Cheng 已提交
213
      return -1;
S
slguan 已提交
214
#endif				
H
hzcheng 已提交
215 216 217 218
    }
  } else if (numOfFiles < 0) {
    // create empty header files forward
    pVnode->fileId++;
S
slguan 已提交
219 220 221 222
    if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) 
#ifdef CLUSTER	  
	    return vnodeRecoverFromPeer(pVnode, pVnode->fileId);
#else
H
Hongze Cheng 已提交
223
      return -1;
S
slguan 已提交
224
#endif
S
slguan 已提交
225
    pVnode->lastKeyOnFile += (int64_t)tsMsPerDay[(uint8_t)pVnode->cfg.precision] * pCfg->daysPerFile;
H
hzcheng 已提交
226 227 228 229 230 231
    filesAdded = 1;
    numOfFiles = 0;  // hacker way
  }

  fileId = pVnode->fileId - numOfFiles;
  pVnode->commitLastKey =
S
slguan 已提交
232 233
      pVnode->lastKeyOnFile - (int64_t)numOfFiles * tsMsPerDay[(uint8_t)pVnode->cfg.precision] * pCfg->daysPerFile;
  pVnode->commitFirstKey = pVnode->commitLastKey - (int64_t)tsMsPerDay[(uint8_t)pVnode->cfg.precision] * pCfg->daysPerFile + 1;
H
hzcheng 已提交
234 235 236
  pVnode->commitFileId = fileId;
  pVnode->numOfFiles = pVnode->numOfFiles + filesAdded;

H
Hongze Cheng 已提交
237 238 239 240 241 242 243 244 245 246 247
  return 0;
}


int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
  char        name[TSDB_FILENAME_LEN];
  char        dHeadName[TSDB_FILENAME_LEN] = "\0";
  char        dLastName[TSDB_FILENAME_LEN] = "\0";
  int         len = 0;
  struct stat filestat;
  int         vnode = pVnode->vnode;
S
slguan 已提交
248
  int         fileId;
H
Hongze Cheng 已提交
249 250 251

  if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1;

H
Hongze Cheng 已提交
252 253
  fileId = pVnode->commitFileId;

L
lihui 已提交
254
  dTrace("vid:%d, commit fileId:%d, commitLastKey:%" PRId64 ", vnodeLastKey:%" PRId64 ", lastKeyOnFile:%" PRId64 " numOfFiles:%d",
H
hzcheng 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
      vnode, fileId, pVnode->commitLastKey, pVnode->lastKey, pVnode->lastKeyOnFile, pVnode->numOfFiles);

  int minSize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM) + TSDB_FILE_HEADER_LEN;

  vnodeGetHeadDataLname(pVnode->cfn, name, pVnode->lfn, vnode, fileId);
  readlink(pVnode->cfn, dHeadName, TSDB_FILENAME_LEN);
  readlink(pVnode->lfn, dLastName, TSDB_FILENAME_LEN);
  len = strlen(dHeadName);
  if (dHeadName[len - 1] == 'd') {
    dHeadName[len] = '0';
    dHeadName[len + 1] = '\0';
  } else {
    dHeadName[len - 1] = '0' + (dHeadName[len - 1] + 1 - '0') % 2;
  }
  len = strlen(dLastName);
  if (dLastName[len - 1] == 't') {
    dLastName[len] = '0';
    dLastName[len + 1] = '\0';
  } else {
    dLastName[len - 1] = '0' + (dLastName[len - 1] + 1 - '0') % 2;
  }
  vnodeGetHeadTname(pVnode->nfn, pVnode->tfn, vnode, fileId);
  symlink(dHeadName, pVnode->nfn);
  if (!noTempLast) symlink(dLastName, pVnode->tfn);

  // open head file
  pVnode->hfd = open(pVnode->cfn, O_RDONLY);
  if (pVnode->hfd < 0) {
    dError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
    taosLogError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
S
slguan 已提交
285
    vnodeRecoverFromPeer(pVnode, fileId);
H
hzcheng 已提交
286 287 288 289 290 291 292 293
    goto _error;
  }

  // verify head file, check size
  fstat(pVnode->hfd, &filestat);
  if (filestat.st_size < minSize) {
    dError("vid:%d, head file:%s corrupted", vnode, pVnode->cfn);
    taosLogError("vid:%d, head file:%s corrupted", vnode, pVnode->cfn);
S
slguan 已提交
294
    vnodeRecoverFromPeer(pVnode, fileId);
H
hzcheng 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
    goto _error;
  }

  // open a new header file
  pVnode->nfd = open(pVnode->nfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
  if (pVnode->nfd < 0) {
    dError("vid:%d, failed to open new head file:%s, reason:%s", vnode, pVnode->nfn, strerror(errno));
    taosLogError("vid:%d, failed to open new head file:%s, reason:%s", vnode, pVnode->nfn, strerror(errno));
    goto _error;
  }
  vnodeCreateFileHeaderFd(pVnode->nfd);

  // open existing data file
  pVnode->dfd = open(name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
  if (pVnode->dfd < 0) {
    dError("vid:%d, failed to open data file:%s, reason:%s", vnode, name, strerror(errno));
    taosLogError("vid:%d, failed to open data file:%s, reason:%s", vnode, name, strerror(errno));
S
slguan 已提交
312
    vnodeRecoverFromPeer(pVnode, fileId);
H
hzcheng 已提交
313 314 315 316 317 318 319 320
    goto _error;
  }

  // verify data file, check size
  fstat(pVnode->dfd, &filestat);
  if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
    dError("vid:%d, data file:%s corrupted", vnode, name);
    taosLogError("vid:%d, data file:%s corrupted", vnode, name);
S
slguan 已提交
321
    vnodeRecoverFromPeer(pVnode, fileId);
H
hzcheng 已提交
322 323
    goto _error;
  } else {
S
slguan 已提交
324
    dPrint("vid:%d, data file:%s is opened to write", vnode, name);
H
hzcheng 已提交
325 326 327 328 329 330 331
  }

  // open last file
  pVnode->lfd = open(pVnode->lfn, O_RDWR);
  if (pVnode->lfd < 0) {
    dError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
    taosLogError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
S
slguan 已提交
332
    vnodeRecoverFromPeer(pVnode, fileId);
H
hzcheng 已提交
333 334 335 336 337 338 339 340
    goto _error;
  }

  // verify last file, check size
  fstat(pVnode->lfd, &filestat);
  if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
    dError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
    taosLogError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
S
slguan 已提交
341
    vnodeRecoverFromPeer(pVnode, fileId);
H
hzcheng 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
    goto _error;
  }

  // open a new last file
  if (noTempLast) {
    pVnode->tfd = -1;  // do not open temporary last file
  } else {
    pVnode->tfd = open(pVnode->tfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
    if (pVnode->tfd < 0) {
      dError("vid:%d, failed to open new last file:%s, reason:%s", vnode, pVnode->tfn, strerror(errno));
      taosLogError("vid:%d, failed to open new last file:%s, reason:%s", vnode, pVnode->tfn, strerror(errno));
      goto _error;
    }
    vnodeCreateFileHeaderFd(pVnode->tfd);
    pVnode->lfSize = lseek(pVnode->tfd, 0, SEEK_END);
  }

  int   size = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
  char *temp = malloc(size);
S
slguan 已提交
361 362 363 364 365 366
  if (NULL == temp) {
    dError("vid:%d, malloc failed", vnode);
    taosLogError("vid:%d, malloc failed", vnode);
    //vnodeRecoverFromPeer(pVnode, fileId);
    goto _error;
  }
H
hzcheng 已提交
367
  memset(temp, 0, size);
S
slguan 已提交
368
  
H
hzcheng 已提交
369
  taosCalcChecksumAppend(0, (uint8_t *)temp, size);
S
slguan 已提交
370
  twrite(pVnode->nfd, temp, size);
H
hzcheng 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
  free(temp);

  pVnode->dfSize = lseek(pVnode->dfd, 0, SEEK_END);

  return 0;

_error:
  if (pVnode->dfd > 0) close(pVnode->dfd);
  pVnode->dfd = 0;

  if (pVnode->hfd > 0) close(pVnode->hfd);
  pVnode->hfd = 0;

  if (pVnode->nfd > 0) close(pVnode->nfd);
  pVnode->nfd = 0;

  if (pVnode->lfd > 0) close(pVnode->lfd);
  pVnode->lfd = 0;

  if (pVnode->tfd > 0) close(pVnode->tfd);
  pVnode->tfd = 0;

  return -1;
}

void vnodeRemoveFile(int vnode, int fileId) {
  char           headName[TSDB_FILENAME_LEN] = "\0";
  char           dataName[TSDB_FILENAME_LEN] = "\0";
  char           lastName[TSDB_FILENAME_LEN] = "\0";
  char           dHeadName[TSDB_FILENAME_LEN] = "\0";
  char           dDataName[TSDB_FILENAME_LEN] = "\0";
  char           dLastName[TSDB_FILENAME_LEN] = "\0";
  SVnodeObj *    pVnode = NULL;
  SVnodeHeadInfo headInfo;

  pVnode = vnodeList + vnode;

  vnodeGetHeadDataLname(headName, dataName, lastName, vnode, fileId);
S
slguan 已提交
409 410 411 412
  char *path = vnodeGetDiskFromHeadFile(headName);
  if (path == NULL) {
    return ;
  }	
H
hzcheng 已提交
413 414 415 416 417
  vnodeGetDnameFromLname(headName, dataName, lastName, dHeadName, dDataName, dLastName);

  int fd = open(headName, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
  if (fd > 0) {
    vnodeGetHeadFileHeaderInfo(fd, &headInfo);
weixin_48148422's avatar
weixin_48148422 已提交
418
    atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), -headInfo.totalStorage);
H
hzcheng 已提交
419 420 421 422 423 424 425 426 427 428
    close(fd);
  }

  remove(headName);
  remove(dataName);
  remove(lastName);
  remove(dHeadName);
  remove(dDataName);
  remove(dLastName);

S
slguan 已提交
429
  dPrint("vid:%d fileId:%d on disk: %s is removed, numOfFiles:%d maxFiles:%d", vnode, fileId, path,
H
hzcheng 已提交
430 431 432 433 434 435 436
         pVnode->numOfFiles, pVnode->maxFiles);
}

void vnodeCloseCommitFiles(SVnodeObj *pVnode) {
  char dpath[TSDB_FILENAME_LEN] = "\0";
  int  ret;

S
slguan 已提交
437
  // Check new if new header file is correct
438 439 440
  if (tsCheckHeaderFile != 0) {
    assert(vnodeCheckNewHeaderFile(pVnode->nfd, pVnode) == 0);
  }
S
slguan 已提交
441

H
hzcheng 已提交
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
  close(pVnode->nfd);
  pVnode->nfd = 0;

  close(pVnode->hfd);
  pVnode->hfd = 0;

  close(pVnode->dfd);
  pVnode->dfd = 0;

  close(pVnode->lfd);
  pVnode->lfd = 0;

  if (pVnode->tfd > 0) close(pVnode->tfd);

  pthread_mutex_lock(&(pVnode->vmutex));

  readlink(pVnode->cfn, dpath, TSDB_FILENAME_LEN);
  ret = rename(pVnode->nfn, pVnode->cfn);
  if (ret < 0) {
    dError("vid:%d, failed to rename:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
  }
  remove(dpath);

  if (pVnode->tfd > 0) {
    memset(dpath, 0, TSDB_FILENAME_LEN);
    readlink(pVnode->lfn, dpath, TSDB_FILENAME_LEN);
    ret = rename(pVnode->tfn, pVnode->lfn);
    if (ret < 0) {
      dError("vid:%d, failed to rename:%s, reason:%s", pVnode->vnode, pVnode->tfn, strerror(errno));
    }
    remove(dpath);
  }

  pthread_mutex_unlock(&(pVnode->vmutex));

  pVnode->tfd = 0;

  dTrace("vid:%d, %s and %s is saved", pVnode->vnode, pVnode->cfn, pVnode->lfn);
S
slguan 已提交
480
  vnodeAdustVnodeFile(pVnode);
H
hzcheng 已提交
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
  vnodeSaveAllMeterObjToFile(pVnode->vnode);

  return;
}

void vnodeBroadcastStatusToUnsyncedPeer(SVnodeObj *pVnode);

void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid) {
  int              vnode = pVnode->vnode;
  SData *          data[TSDB_MAX_COLUMNS], *cdata[TSDB_MAX_COLUMNS];  // first 4 bytes are length
  char *           buffer = NULL, *dmem = NULL, *cmem = NULL, *hmem = NULL, *tmem = NULL;
  SMeterObj *      pObj = NULL;
  SCompInfo        compInfo = {0};
  SCompHeader *    pHeader;
  SMeterInfo *     meterInfo = NULL, *pMeter = NULL;
  SQuery           query;
S
slguan 已提交
497
  SColumnInfoEx    colList[TSDB_MAX_COLUMNS] = {0};
H
hzcheng 已提交
498 499 500
  SSqlFunctionExpr pExprs[TSDB_MAX_COLUMNS] = {0};
  int              commitAgain;
  int              headLen, sid, col;
S
slguan 已提交
501 502
  int64_t          pointsRead;
  int64_t          pointsReadLast;
H
hzcheng 已提交
503 504 505 506 507 508
  SCompBlock *     pCompBlock = NULL;
  SVnodeCfg *      pCfg = &pVnode->cfg;
  TSCKSUM          chksum;
  SVnodeHeadInfo   headInfo;
  uint8_t *        pOldCompBlocks;

L
lihui 已提交
509
  dPrint("vid:%d, committing to file, firstKey:%" PRId64 " lastKey:%" PRId64 " ssid:%d esid:%d", vnode, pVnode->firstKey,
H
hzcheng 已提交
510 511 512
         pVnode->lastKey, ssid, esid);
  if (pVnode->lastKey == 0) goto _over;

S
slguan 已提交
513
  vnodeCloseAllSyncFds(vnode);
H
hzcheng 已提交
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  vnodeRenewCommitLog(vnode);

  // get the MAX consumption buffer for this vnode
  int32_t maxBytesPerPoint = 0;
  int32_t minBytesPerPoint = INT32_MAX;
  for (sid = ssid; sid <= esid; ++sid) {
    pObj = (SMeterObj *)(pVnode->meterList[sid]);
    if ((pObj == NULL) || (pObj->pCache == NULL)) continue;

    if (maxBytesPerPoint < pObj->bytesPerPoint) {
      maxBytesPerPoint = pObj->bytesPerPoint;
    }
    if (minBytesPerPoint > pObj->bytesPerPoint) {
      minBytesPerPoint = pObj->bytesPerPoint;
    }
  }

  // buffer to hold the temp head
  int tcachblocks = pCfg->cacheBlockSize / (minBytesPerPoint * pCfg->rowsInFileBlock);

  int hmsize =
      (pCfg->cacheNumOfBlocks.totalBlocks * (MAX(tcachblocks, 1) + 1) + pCfg->maxSessions) * sizeof(SCompBlock);

  // buffer to hold the uncompressed data
  int dmsize =
      maxBytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * TSDB_MAX_COLUMNS;

  // buffer to hold the compressed data
  int cmsize =
      maxBytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * TSDB_MAX_COLUMNS;

  // buffer to hold compHeader
  int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM);

  // buffer to hold meterInfo
  int misize = pVnode->cfg.maxSessions * sizeof(SMeterInfo);

  int totalSize = hmsize + dmsize + cmsize + misize + tmsize;
  buffer = malloc(totalSize);
  if (buffer == NULL) {
    dError("no enough memory for committing buffer");
    return NULL;
  }

  hmem = buffer;
  dmem = hmem + hmsize;
  cmem = dmem + dmsize;
  tmem = cmem + cmsize;
  meterInfo = (SMeterInfo *)(tmem + tmsize);

  pthread_mutex_lock(&(pVnode->vmutex));
  pVnode->commitFirstKey = pVnode->firstKey;
  pVnode->firstKey = pVnode->lastKey + 1;
  pthread_mutex_unlock(&(pVnode->vmutex));

_again:
  pVnode->commitInProcess = 1;
  commitAgain = 0;
  memset(hmem, 0, totalSize);
  memset(&query, 0, sizeof(query));

  if (vnodeOpenCommitFiles(pVnode, ssid) < 0) goto _over;
L
lihui 已提交
576
  dTrace("vid:%d, start to commit, commitFirstKey:%" PRId64 " commitLastKey:%" PRId64, vnode, pVnode->commitFirstKey,
H
hzcheng 已提交
577 578 579 580 581 582 583 584 585 586 587 588
         pVnode->commitLastKey);

  headLen = 0;
  vnodeGetHeadFileHeaderInfo(pVnode->hfd, &headInfo);
  int maxOldBlocks = 1;

  // read head info
  if (pVnode->hfd) {
    lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
    if (read(pVnode->hfd, tmem, tmsize) <= 0) {
      dError("vid:%d, failed to read old header file:%s", vnode, pVnode->cfn);
      taosLogError("vid:%d, failed to read old header file:%s", vnode, pVnode->cfn);
S
slguan 已提交
589
      vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
H
hzcheng 已提交
590 591 592
      goto _over;
    } else {
      if (!taosCheckChecksumWhole((uint8_t *)tmem, tmsize)) {
S
slguan 已提交
593
        dError("vid:%d, failed to read old header file:%s since comp header offset is broken", vnode, pVnode->cfn);
H
hzcheng 已提交
594
        taosLogError("vid:%d, failed to read old header file:%s since comp header offset is broken",
S
slguan 已提交
595 596 597
            vnode, pVnode->cfn);

        vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
H
hzcheng 已提交
598 599 600 601 602 603 604
        goto _over;
      }
    }
  }

  // read compInfo
  for (sid = 0; sid < pCfg->maxSessions; ++sid) {
S
slguan 已提交
605 606 607 608
    if (pVnode->meterList == NULL) {  // vnode is being freed, abort
      goto _over;
    }

H
hzcheng 已提交
609
    pObj = (SMeterObj *)(pVnode->meterList[sid]);
S
slguan 已提交
610 611 612 613 614
    if (pObj == NULL) {
      continue;
    }

    // meter is going to be deleted, abort
H
hjxilinx 已提交
615
    if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) {
S
slguan 已提交
616 617 618
      dWarn("vid:%d sid:%d is dropped, ignore this meter", vnode, sid);
      continue;
    }
H
hzcheng 已提交
619 620 621 622 623 624 625 626 627 628

    pMeter = meterInfo + sid;
    pHeader = ((SCompHeader *)tmem) + sid;

    if (pVnode->hfd > 0) {
      if (pHeader->compInfoOffset > 0) {
        lseek(pVnode->hfd, pHeader->compInfoOffset, SEEK_SET);
        if (read(pVnode->hfd, &compInfo, sizeof(compInfo)) == sizeof(compInfo)) {
          if (!taosCheckChecksumWhole((uint8_t *)(&compInfo), sizeof(SCompInfo))) {
            dError("vid:%d sid:%d id:%s, failed to read compinfo in file:%s since checksum mismatch",
S
slguan 已提交
629
                vnode, sid, pObj->meterId, pVnode->cfn);
H
hzcheng 已提交
630
            taosLogError("vid:%d sid:%d id:%s, failed to read compinfo in file:%s since checksum mismatch",
S
slguan 已提交
631 632
                vnode, sid, pObj->meterId, pVnode->cfn);
            vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
H
hzcheng 已提交
633 634 635 636 637 638 639 640 641 642 643 644
            goto _over;
          } else {
            if (pObj->uid == compInfo.uid) {
              pMeter->oldNumOfBlocks = compInfo.numOfBlocks;
              pMeter->oldCompBlockOffset = pHeader->compInfoOffset + sizeof(SCompInfo);
              pMeter->last = compInfo.last;
              if (compInfo.numOfBlocks > maxOldBlocks) maxOldBlocks = compInfo.numOfBlocks;
              if (pMeter->last) {
                lseek(pVnode->hfd, sizeof(SCompBlock) * (compInfo.numOfBlocks - 1), SEEK_CUR);
                read(pVnode->hfd, &pMeter->lastBlock, sizeof(SCompBlock));
              }
            } else {
L
lihui 已提交
645
              dTrace("vid:%d sid:%d id:%s, uid:%" PRIu64 " is not matched with old:%" PRIu64 ", old data will be thrown away",
S
slguan 已提交
646
                  vnode, sid, pObj->meterId, pObj->uid, compInfo.uid);
H
hzcheng 已提交
647 648 649 650 651
              pMeter->oldNumOfBlocks = 0;
            }
          }
        } else {
          dError("vid:%d sid:%d id:%s, failed to read compinfo in file:%s", vnode, sid, pObj->meterId, pVnode->cfn);
S
slguan 已提交
652
          vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
H
hzcheng 已提交
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685
          goto _over;
        }
      }
    }
  }
  // Loop To write data to fileId
  for (sid = ssid; sid <= esid; ++sid) {
    pObj = (SMeterObj *)(pVnode->meterList[sid]);
    if ((pObj == NULL) || (pObj->pCache == NULL)) continue;

    data[0] = (SData *)dmem;
    cdata[0] = (SData *)cmem;
    for (col = 1; col < pObj->numOfColumns; ++col) {
      data[col] = (SData *)(((char *)data[col - 1]) + sizeof(SData) +
                            pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + EXTRA_BYTES + sizeof(TSCKSUM));
      cdata[col] = (SData *)(((char *)cdata[col - 1]) + sizeof(SData) +
                             pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + EXTRA_BYTES + sizeof(TSCKSUM));
    }

    pMeter = meterInfo + sid;
    pMeter->tempHeadOffset = headLen;

    memset(&query, 0, sizeof(query));
    query.colList = colList;
    query.pSelectExpr = pExprs;

    query.ekey = pVnode->commitLastKey;
    query.skey = pVnode->commitFirstKey;
    query.lastKey = query.skey;

    query.sdata = data;
    vnodeSetCommitQuery(pObj, &query);

L
lihui 已提交
686
    dTrace("vid:%d sid:%d id:%s, start to commit, startKey:%" PRId64 " slot:%d pos:%d", pObj->vnode, pObj->sid, pObj->meterId,
H
hzcheng 已提交
687 688 689 690 691 692 693
           pObj->lastKeyOnFile, query.slot, query.pos);

    pointsRead = 0;
    pointsReadLast = 0;

    // last block is at last file
    if (pMeter->last) {
S
slguan 已提交
694
      if ((pMeter->lastBlock.sversion != pObj->sversion) || (query.over)) {
H
hzcheng 已提交
695 696 697 698 699
        // TODO : Check the correctness of this code. write the last block to
        // .data file
        pCompBlock = (SCompBlock *)(hmem + headLen);
        assert(dmem - (char *)pCompBlock >= sizeof(SCompBlock));
        *pCompBlock = pMeter->lastBlock;
S
slguan 已提交
700 701 702 703 704 705 706 707 708
        if (pMeter->lastBlock.sversion != pObj->sversion) {
          pCompBlock->last = 0;
          pCompBlock->offset = lseek(pVnode->dfd, 0, SEEK_END);
          pMeter->last = 0;
          lseek(pVnode->lfd, pMeter->lastBlock.offset, SEEK_SET);
          tsendfile(pVnode->dfd, pVnode->lfd, NULL, pMeter->lastBlock.len);
          pVnode->dfSize = pCompBlock->offset + pMeter->lastBlock.len;
        } else {
          if (ssid == 0) {
S
slguan 已提交
709 710
            assert(pCompBlock->last && pVnode->tfd != -1);
            pCompBlock->offset = lseek(pVnode->tfd, 0, SEEK_END); 
S
slguan 已提交
711 712 713 714 715 716
            lseek(pVnode->lfd, pMeter->lastBlock.offset, SEEK_SET);
            tsendfile(pVnode->tfd, pVnode->lfd, NULL, pMeter->lastBlock.len);
            pVnode->lfSize = pCompBlock->offset + pMeter->lastBlock.len;
          } else {
            assert(pVnode->tfd == -1);
          }
S
slguan 已提交
717

S
slguan 已提交
718
        }
H
hzcheng 已提交
719 720 721 722 723 724

        headLen += sizeof(SCompBlock);
        pMeter->newNumOfBlocks++;
      } else {
        // read last block into memory
        if (vnodeReadLastBlockToMem(pObj, &pMeter->lastBlock, data) < 0) goto _over;
S
slguan 已提交
725
        pMeter->last = 0;
H
hzcheng 已提交
726 727 728
        pointsReadLast = pMeter->lastBlock.numOfPoints;
        query.over = 0;
        headInfo.totalStorage -= (pointsReadLast * pObj->bytesPerPoint);
S
slguan 已提交
729

H
hzcheng 已提交
730
        dTrace("vid:%d sid:%d id:%s, points:%d in last block will be merged to new block",
S
slguan 已提交
731
            pObj->vnode, pObj->sid, pObj->meterId, pointsReadLast);
H
hzcheng 已提交
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762
      }

      pMeter->changed = 1;
      pMeter->oldNumOfBlocks--;
    }

    while (query.over == 0) {
      pCompBlock = (SCompBlock *)(hmem + headLen);
      assert(dmem - (char *)pCompBlock >= sizeof(SCompBlock));
      pointsRead += pointsReadLast;

      while (pointsRead < pObj->pointsPerFileBlock) {
        query.pointsToRead = pObj->pointsPerFileBlock - pointsRead;
        query.pointsOffset = pointsRead;
        pointsRead += vnodeQueryFromCache(pObj, &query);
        if (query.over) break;
      }

      if (pointsRead == 0) break;

      headInfo.totalStorage += ((pointsRead - pointsReadLast) * pObj->bytesPerPoint);
      pCompBlock->last = 1;
      if (vnodeWriteBlockToFile(pObj, pCompBlock, data, cdata, pointsRead) < 0) goto _over;
      if (pCompBlock->keyLast > pObj->lastKeyOnFile) pObj->lastKeyOnFile = pCompBlock->keyLast;
      pMeter->last = pCompBlock->last;

      // write block info into header buffer
      headLen += sizeof(SCompBlock);
      pMeter->newNumOfBlocks++;
      pMeter->committedPoints += (pointsRead - pointsReadLast);

L
lihui 已提交
763
      dTrace("vid:%d sid:%d id:%s, pointsRead:%d, pointsReadLast:%d lastKey:%" PRId64 ", "
S
slguan 已提交
764 765 766
             "slot:%d pos:%d newNumOfBlocks:%d headLen:%d",
          pObj->vnode, pObj->sid, pObj->meterId, pointsRead, pointsReadLast, pObj->lastKeyOnFile, query.slot, query.pos,
          pMeter->newNumOfBlocks, headLen);
H
hzcheng 已提交
767 768 769 770 771 772 773

      if (pointsRead < pObj->pointsPerFileBlock || query.keyIsMet) break;

      pointsRead = 0;
      pointsReadLast = 0;
    }

L
lihui 已提交
774
    dTrace("vid:%d sid:%d id:%s, %d points are committed, lastKey:%" PRId64 " slot:%d pos:%d newNumOfBlocks:%d",
S
slguan 已提交
775 776
        pObj->vnode, pObj->sid, pObj->meterId, pMeter->committedPoints, pObj->lastKeyOnFile, query.slot, query.pos,
        pMeter->newNumOfBlocks);
H
hzcheng 已提交
777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814

    if (pMeter->committedPoints > 0) {
      pMeter->commitSlot = query.slot;
      pMeter->commitPos = query.pos;
    }

    TSKEY nextKey = 0;
    if (pObj->lastKey > pVnode->commitLastKey)
      nextKey = pVnode->commitLastKey + 1;
    else if (pObj->lastKey > pObj->lastKeyOnFile)
      nextKey = pObj->lastKeyOnFile + 1;

    pthread_mutex_lock(&(pVnode->vmutex));
    if (nextKey < pVnode->firstKey && nextKey > 1) pVnode->firstKey = nextKey;
    pthread_mutex_unlock(&(pVnode->vmutex));
  }

  if (pVnode->lastKey > pVnode->commitLastKey) commitAgain = 1;

  dTrace("vid:%d, finish appending the data file", vnode);

  // calculate the new compInfoOffset
  int compInfoOffset = TSDB_FILE_HEADER_LEN + tmsize;
  for (sid = 0; sid < pCfg->maxSessions; ++sid) {
    pObj = (SMeterObj *)(pVnode->meterList[sid]);
    pHeader = ((SCompHeader *)tmem) + sid;
    if (pObj == NULL) {
      pHeader->compInfoOffset = 0;
      continue;
    }

    pMeter = meterInfo + sid;
    pMeter->compInfoOffset = compInfoOffset;
    pMeter->finalNumOfBlocks = pMeter->oldNumOfBlocks + pMeter->newNumOfBlocks;

    if (pMeter->finalNumOfBlocks > 0) {
      pHeader->compInfoOffset = pMeter->compInfoOffset;
      compInfoOffset += sizeof(SCompInfo) + pMeter->finalNumOfBlocks * sizeof(SCompBlock) + sizeof(TSCKSUM);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
815 816
    } else {
      pHeader->compInfoOffset = 0;
H
hzcheng 已提交
817
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
818

H
hzcheng 已提交
819 820 821 822 823 824 825 826
    dTrace("vid:%d sid:%d id:%s, oldBlocks:%d numOfBlocks:%d compInfoOffset:%d", pObj->vnode, pObj->sid, pObj->meterId,
           pMeter->oldNumOfBlocks, pMeter->finalNumOfBlocks, compInfoOffset);
  }

  // write the comp header into new file
  vnodeUpdateHeadFileHeader(pVnode->nfd, &headInfo);
  lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
  taosCalcChecksumAppend(0, (uint8_t *)tmem, tmsize);
S
slguan 已提交
827
  if (twrite(pVnode->nfd, tmem, tmsize) <= 0) {
H
hzcheng 已提交
828 829
    dError("vid:%d sid:%d id:%s, failed to write:%s, error:%s", vnode, sid, pObj->meterId, pVnode->nfn,
           strerror(errno));
S
slguan 已提交
830
    vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
H
hzcheng 已提交
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850
    goto _over;
  }

  pOldCompBlocks = (uint8_t *)malloc(sizeof(SCompBlock) * maxOldBlocks);

  // write the comp block list in new file
  for (sid = 0; sid < pCfg->maxSessions; ++sid) {
    pObj = (SMeterObj *)(pVnode->meterList[sid]);
    if (pObj == NULL) continue;

    pMeter = meterInfo + sid;
    if (pMeter->finalNumOfBlocks <= 0) continue;

    compInfo.last = pMeter->last;
    compInfo.uid = pObj->uid;
    compInfo.numOfBlocks = pMeter->finalNumOfBlocks;
    /* compInfo.compBlockLen = pMeter->finalCompBlockLen; */
    compInfo.delimiter = TSDB_VNODE_DELIMITER;
    taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo));
    lseek(pVnode->nfd, pMeter->compInfoOffset, SEEK_SET);
S
slguan 已提交
851
    if (twrite(pVnode->nfd, &compInfo, sizeof(compInfo)) <= 0) {
H
hzcheng 已提交
852 853
      dError("vid:%d sid:%d id:%s, failed to write:%s, reason:%s", vnode, sid, pObj->meterId, pVnode->nfn,
             strerror(errno));
S
slguan 已提交
854
      vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
H
hzcheng 已提交
855 856 857 858 859 860 861 862 863 864
      goto _over;
    }

    // write the old comp blocks
    chksum = 0;
    if (pVnode->hfd && pMeter->oldNumOfBlocks) {
      lseek(pVnode->hfd, pMeter->oldCompBlockOffset, SEEK_SET);
      if (pMeter->changed) {
        int compBlockLen = pMeter->oldNumOfBlocks * sizeof(SCompBlock);
        read(pVnode->hfd, pOldCompBlocks, compBlockLen);
S
slguan 已提交
865
        twrite(pVnode->nfd, pOldCompBlocks, compBlockLen);
H
hzcheng 已提交
866 867
        chksum = taosCalcChecksum(0, pOldCompBlocks, compBlockLen);
      } else {
S
slguan 已提交
868
        tsendfile(pVnode->nfd, pVnode->hfd, NULL, pMeter->oldNumOfBlocks * sizeof(SCompBlock));
H
hzcheng 已提交
869 870 871 872 873 874 875
        read(pVnode->hfd, &chksum, sizeof(TSCKSUM));
      }
    }

    if (pMeter->newNumOfBlocks) {
      chksum = taosCalcChecksum(chksum, (uint8_t *)(hmem + pMeter->tempHeadOffset),
                                pMeter->newNumOfBlocks * sizeof(SCompBlock));
S
slguan 已提交
876
      if (twrite(pVnode->nfd, hmem + pMeter->tempHeadOffset, pMeter->newNumOfBlocks * sizeof(SCompBlock)) <= 0) {
H
hzcheng 已提交
877 878
        dError("vid:%d sid:%d id:%s, failed to write:%s, reason:%s", vnode, sid, pObj->meterId, pVnode->nfn,
               strerror(errno));
S
slguan 已提交
879
        vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
H
hzcheng 已提交
880 881 882
        goto _over;
      }
    }
S
slguan 已提交
883
    twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
H
hzcheng 已提交
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915
  }

  tfree(pOldCompBlocks);
  dTrace("vid:%d, finish writing the new header file:%s", vnode, pVnode->nfn);
  vnodeCloseCommitFiles(pVnode);

  for (sid = ssid; sid <= esid; ++sid) {
    pObj = (SMeterObj *)(pVnode->meterList[sid]);
    if (pObj == NULL) continue;

    pMeter = meterInfo + sid;
    if (pMeter->finalNumOfBlocks <= 0) continue;

    if (pMeter->committedPoints > 0) {
      vnodeUpdateCommitInfo(pObj, pMeter->commitSlot, pMeter->commitPos, pMeter->commitCount);
    }
  }

  if (commitAgain) {
    pVnode->commitFirstKey = pVnode->commitLastKey + 1;
    goto _again;
  }

  vnodeRemoveCommitLog(vnode);

_over:
  pVnode->commitInProcess = 0;
  vnodeCommitOver(pVnode);
  memset(&(vnodeList[vnode].commitThread), 0, sizeof(vnodeList[vnode].commitThread));
  tfree(buffer);
  tfree(pOldCompBlocks);

S
slguan 已提交
916
  vnodeBroadcastStatusToUnsyncedPeer(pVnode);
H
hzcheng 已提交
917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952
  dPrint("vid:%d, committing is over", vnode);

  return pVnode;
}

void *vnodeCommitToFile(void *param) {
  SVnodeObj *pVnode = (SVnodeObj *)param;

  return vnodeCommitMultiToFile(pVnode, 0, pVnode->cfg.maxSessions - 1);
}

int vnodeGetCompBlockInfo(SMeterObj *pObj, SQuery *pQuery) {
  char        prefix[TSDB_FILENAME_LEN];
  char        fileName[TSDB_FILENAME_LEN];
  SCompHeader compHeader;
  SCompInfo   compInfo;
  struct stat fstat;
  SVnodeObj * pVnode = &vnodeList[pObj->vnode];
  char *      buffer = NULL;
  TSCKSUM     chksum;

  vnodeFreeFields(pQuery);
  tfree(pQuery->pBlock);

  pQuery->numOfBlocks = 0;
  SVnodeCfg *pCfg = &vnodeList[pObj->vnode].cfg;

  if (pQuery->hfd > 0) close(pQuery->hfd);
  sprintf(prefix, "%s/vnode%d/db/v%df%d", tsDirectory, pObj->vnode, pObj->vnode, pQuery->fileId);

  sprintf(fileName, "%s.head", prefix);
  pthread_mutex_lock(&(pVnode->vmutex));
  pQuery->hfd = open(fileName, O_RDONLY);
  pthread_mutex_unlock(&(pVnode->vmutex));

  if (pQuery->hfd < 0) {
S
slguan 已提交
953 954 955
    dError("vid:%d sid:%d id:%s, failed to open head file:%s, reason:%s", pObj->vnode, pObj->sid, pObj->meterId,
           fileName, strerror(errno));
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971
  }

  int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM);
  buffer = (char *)calloc(1, tmsize);
  if (buffer == NULL) {
    dError("vid:%d sid:%d id:%s, failed to allocate memory to buffer", pObj->vnode, pObj->sid, pObj->meterId);
    return -TSDB_CODE_APP_ERROR;
  }

  lseek(pQuery->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
  if (read(pQuery->hfd, buffer, tmsize) != tmsize) {
    dError("vid:%d sid:%d id:%s, file:%s failed to read comp header, reason:%s", pObj->vnode, pObj->sid, pObj->meterId,
           fileName, strerror(errno));
    taosLogError("vid:%d sid:%d id:%s, file:%s failed to read comp header", pObj->vnode, pObj->sid, pObj->meterId,
                 fileName);
    tfree(buffer);
S
slguan 已提交
972
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
973 974 975 976 977 978 979 980
  }

  if (!taosCheckChecksumWhole((uint8_t *)buffer, tmsize)) {
    dError("vid:%d sid:%d id:%s, file:%s comp header offset is broken", pObj->vnode, pObj->sid, pObj->meterId,
           fileName);
    taosLogError("vid:%d sid:%d id:%s, file:%s comp header offset is broken", pObj->vnode, pObj->sid, pObj->meterId,
                 fileName);
    tfree(buffer);
S
slguan 已提交
981
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
982 983 984 985 986 987 988 989
  }
  compHeader = ((SCompHeader *)buffer)[pObj->sid];
  tfree(buffer);
  if (compHeader.compInfoOffset == 0) return 0;

  lseek(pQuery->hfd, compHeader.compInfoOffset, SEEK_SET);
  read(pQuery->hfd, &compInfo, sizeof(SCompInfo));
  if (!taosCheckChecksumWhole((uint8_t *)(&compInfo), sizeof(SCompInfo))) {
S
slguan 已提交
990 991 992 993
    dError("vid:%d sid:%d id:%s, file:%s compInfo checksum mismatch", pObj->vnode, pObj->sid, pObj->meterId, fileName);
    taosLogError("vid:%d sid:%d id:%s, file:%s compInfo checksum mismatch", pObj->vnode, pObj->sid, pObj->meterId,
                 fileName);
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
994 995 996 997 998 999 1000 1001
  }
  if (compInfo.numOfBlocks <= 0) return 0;
  if (compInfo.uid != pObj->uid) return 0;

  pQuery->numOfBlocks = compInfo.numOfBlocks;
  pQuery->pBlock = (SCompBlock *)calloc(1, (sizeof(SCompBlock) + sizeof(SField *)) * compInfo.numOfBlocks);
  pQuery->pFields = (SField **)((char *)pQuery->pBlock + sizeof(SCompBlock) * compInfo.numOfBlocks);

S
slguan 已提交
1002 1003
  /* char *pBlock = (char *)pQuery->pBlockFields +
   * sizeof(SCompBlockFields)*compInfo.numOfBlocks; */
H
hzcheng 已提交
1004 1005
  read(pQuery->hfd, pQuery->pBlock, compInfo.numOfBlocks * sizeof(SCompBlock));
  read(pQuery->hfd, &chksum, sizeof(TSCKSUM));
S
slguan 已提交
1006 1007 1008 1009 1010 1011
  if (chksum != taosCalcChecksum(0, (uint8_t *)(pQuery->pBlock), compInfo.numOfBlocks * sizeof(SCompBlock))) {
    dError("vid:%d sid:%d id:%s, head file comp block broken, fileId: %d", pObj->vnode, pObj->sid, pObj->meterId,
           pQuery->fileId);
    taosLogError("vid:%d sid:%d id:%s, head file comp block broken, fileId: %d", pObj->vnode, pObj->sid, pObj->meterId,
                 pQuery->fileId);
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
1012 1013 1014 1015 1016 1017 1018
  }

  close(pQuery->hfd);
  pQuery->hfd = -1;

  sprintf(fileName, "%s.data", prefix);
  if (stat(fileName, &fstat) < 0) {
S
slguan 已提交
1019 1020
    dError("vid:%d sid:%d id:%s, data file:%s not there!", pObj->vnode, pObj->sid, pObj->meterId, fileName);
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
1021 1022 1023 1024 1025
  }

  if (pQuery->dfd > 0) close(pQuery->dfd);
  pQuery->dfd = open(fileName, O_RDONLY);
  if (pQuery->dfd < 0) {
S
slguan 已提交
1026 1027 1028
    dError("vid:%d sid:%d id:%s, failed to open data file:%s, reason:%s", pObj->vnode, pObj->sid, pObj->meterId,
           fileName, strerror(errno));
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
1029 1030 1031 1032
  }

  sprintf(fileName, "%s.last", prefix);
  if (stat(fileName, &fstat) < 0) {
S
slguan 已提交
1033 1034
    dError("vid:%d sid:%d id:%s, last file:%s not there!", pObj->vnode, pObj->sid, pObj->meterId, fileName);
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
1035 1036 1037 1038 1039
  }

  if (pQuery->lfd > 0) close(pQuery->lfd);
  pQuery->lfd = open(fileName, O_RDONLY);
  if (pQuery->lfd < 0) {
S
slguan 已提交
1040 1041 1042
    dError("vid:%d sid:%d id:%s, failed to open last file:%s, reason:%s", pObj->vnode, pObj->sid, pObj->meterId,
           fileName, strerror(errno));
    return vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095
  }

  return pQuery->numOfBlocks;
}

int vnodeReadColumnToMem(int fd, SCompBlock *pBlock, SField **fields, int col, char *data, int dataSize,
                         char *temp, char *buffer, int bufferSize) {
  int     len = 0, size = 0;
  SField *tfields = NULL;
  TSCKSUM chksum = 0;

  if (*fields == NULL) {
    size = sizeof(SField) * (pBlock->numOfCols) + sizeof(TSCKSUM);
    *fields = (SField *)calloc(1, size);
    lseek(fd, pBlock->offset, SEEK_SET);
    read(fd, *fields, size);
    if (!taosCheckChecksumWhole((uint8_t *)(*fields), size)) {
      dError("SField checksum error, col: %d", col);
      taosLogError("SField checksum error, col: %d", col);
      return -1;
    }
  }

  tfields = *fields;

  /* If data is NULL, that means only to read SField content. So no need to read data part. */
  if (data == NULL) return 0;

  lseek(fd, pBlock->offset + tfields[col].offset, SEEK_SET);

  if (pBlock->algorithm) {
    len = read(fd, temp, tfields[col].len);
    read(fd, &chksum, sizeof(TSCKSUM));
    if (chksum != taosCalcChecksum(0, (uint8_t *)temp, tfields[col].len)) {
      dError("data column checksum error, col: %d", col);
      taosLogError("data column checksum error, col: %d", col);
      return -1;
    }

    (*pDecompFunc[tfields[col].type])(temp, tfields[col].len, pBlock->numOfPoints, data, dataSize,
                                      pBlock->algorithm, buffer, bufferSize);

  } else {
    len = read(fd, data, tfields[col].len);
    read(fd, &chksum, sizeof(TSCKSUM));
    if (chksum != taosCalcChecksum(0, (uint8_t *)data, tfields[col].len)) {
      dError("data column checksum error, col: %d", col);
      taosLogError("data column checksum error, col: %d", col);
      return -1;
    }
  }

  if (len <= 0) {
L
lihui 已提交
1096
    dError("failed to read col:%d, offset:%d, reason:%s", col, (int32_t)(tfields[col].offset), strerror(errno));
H
hzcheng 已提交
1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
    return -1;
  }

  return 0;
}

int vnodeReadCompBlockToMem(SMeterObj *pObj, SQuery *pQuery, SData *sdata[]) {
  char *      temp = NULL;
  int         i = 0, col = 0, code = 0;
  SCompBlock *pBlock = NULL;
  SField **   pFields = NULL;
  char *      buffer = NULL;
  int         bufferSize = 0;
  int         dfd = pQuery->dfd;

  tfree(pQuery->pFields[pQuery->slot]);

  pBlock = pQuery->pBlock + pQuery->slot;
  pFields = pQuery->pFields + pQuery->slot;
  temp = malloc(pObj->bytesPerPoint * (pBlock->numOfPoints + 1));

  if (pBlock->last) dfd = pQuery->lfd;

  if (pBlock->algorithm == TWO_STAGE_COMP) {
    bufferSize = pObj->maxBytes * pBlock->numOfPoints + EXTRA_BYTES;
    buffer = (char *)calloc(1, bufferSize);
  }

  if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
    // load timestamp column first in any cases.
    code = vnodeReadColumnToMem(dfd, pBlock, pFields, PRIMARYKEY_TIMESTAMP_COL_INDEX,
                                pQuery->tsData->data + pQuery->pointsOffset * TSDB_KEYSIZE,
                                TSDB_KEYSIZE*pBlock->numOfPoints, temp, buffer, bufferSize);
    col = 1;
  } else {
    // Read the SField data for this block first, if timestamp column is retrieved in this query, we ignore this process
    code = vnodeReadColumnToMem(dfd, pBlock, pFields, 0, NULL, 0, NULL, buffer, bufferSize);
  }

  if (code < 0) goto _over;

  while (col < pBlock->numOfCols && i < pQuery->numOfCols) {
S
slguan 已提交
1139 1140
    SColumnInfo *pColumnInfo = &pQuery->colList[i].data;
    if ((*pFields)[col].colId < pColumnInfo->colId) {
H
hzcheng 已提交
1141
      ++col;
S
slguan 已提交
1142 1143
    } else if ((*pFields)[col].colId == pColumnInfo->colId) {
      code = vnodeReadColumnToMem(dfd, pBlock, pFields, col, sdata[i]->data, pColumnInfo->bytes*pBlock->numOfPoints, temp, buffer, bufferSize);
H
hzcheng 已提交
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175
      if (code < 0) goto _over;
      ++i;
      ++col;
    } else {
      /*
       * pQuery->colList[i].colIdx < (*pFields)[col].colId, this column is not existed in current block,
       * fill space with NULL value
       */
      char *  output = sdata[i]->data;
      int32_t bytes = pQuery->colList[i].data.bytes;
      int32_t type = pQuery->colList[i].data.type;

      setNullN(output, type, bytes, pBlock->numOfPoints);
      ++i;
    }
  }

  if (col >= pBlock->numOfCols && i < pQuery->numOfCols) {
    // remain columns need to set null value
    while (i < pQuery->numOfCols) {
      char *  output = sdata[i]->data;
      int32_t bytes = pQuery->colList[i].data.bytes;
      int32_t type = pQuery->colList[i].data.type;

      setNullN(output, type, bytes, pBlock->numOfPoints);
      ++i;
    }
  }

_over:
  tfree(buffer);
  tfree(temp);
S
slguan 已提交
1176
  if (code < 0) code = vnodeRecoverFromPeer(vnodeList + pObj->vnode, pQuery->fileId);
H
hzcheng 已提交
1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
  return code;
}

int vnodeReadLastBlockToMem(SMeterObj *pObj, SCompBlock *pBlock, SData *sdata[]) {
  char *  temp = NULL;
  int     col = 0, code = 0;
  SField *pFields = NULL;
  char *  buffer = NULL;
  int     bufferSize = 0;

  SVnodeObj *pVnode = vnodeList + pObj->vnode;
  temp = malloc(pObj->bytesPerPoint * (pBlock->numOfPoints + 1));
  if (pBlock->algorithm == TWO_STAGE_COMP) {
    bufferSize = pObj->maxBytes*pBlock->numOfPoints+EXTRA_BYTES;
    buffer = (char *)calloc(1, pObj->maxBytes * pBlock->numOfPoints + EXTRA_BYTES);
  }

  for (col = 0; col < pBlock->numOfCols; ++col) {
    code = vnodeReadColumnToMem(pVnode->lfd, pBlock, &pFields, col, sdata[col]->data, 
                                pObj->pointsPerFileBlock*pObj->schema[col].bytes+EXTRA_BYTES, temp, buffer, bufferSize);
    if (code < 0) break;
    sdata[col]->len = pObj->schema[col].bytes * pBlock->numOfPoints;
  }

  tfree(buffer);
  tfree(temp);
  tfree(pFields);
S
slguan 已提交
1204
  if (code < 0) code = vnodeRecoverFromPeer(pVnode, pVnode->fileId);
H
hzcheng 已提交
1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220
  return code;
}

int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pCompBlock, SData *data[], SData *cdata[], int points) {
  SVnodeObj *pVnode = &vnodeList[pObj->vnode];
  SVnodeCfg *pCfg = &pVnode->cfg;
  int        wlen = 0;
  SField *   fields = NULL;
  int        size = sizeof(SField) * pObj->numOfColumns + sizeof(TSCKSUM);
  int32_t    offset = size;
  char *     buffer = NULL;
  int        bufferSize = 0;

  int dfd = pVnode->dfd;

  if (pCompBlock->last && (points < pObj->pointsPerFileBlock * tsFileBlockMinPercent)) {
L
lihui 已提交
1221
    dTrace("vid:%d sid:%d id:%s, points:%d are written to last block, block stime: %" PRId64 ", block etime: %" PRId64,
H
hzcheng 已提交
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
           pObj->vnode, pObj->sid, pObj->meterId, points, *((TSKEY *)(data[0]->data)),
           *((TSKEY * )(data[0]->data + (points - 1) * pObj->schema[0].bytes)));
    pCompBlock->last = 1;
    dfd = pVnode->tfd > 0 ? pVnode->tfd : pVnode->lfd;
  } else {
    pCompBlock->last = 0;
  }

  pCompBlock->offset = lseek(dfd, 0, SEEK_END);
  pCompBlock->len = 0;

  fields = (SField *)calloc(1, size);
  if (fields == NULL) return -1;

  if (pCfg->compression == TWO_STAGE_COMP){
    bufferSize = pObj->maxBytes * points + EXTRA_BYTES;
    buffer = (char *)malloc(bufferSize);
  } 

  for (int i = 0; i < pObj->numOfColumns; ++i) {
    fields[i].colId = pObj->schema[i].colId;
    fields[i].type = pObj->schema[i].type;
    fields[i].bytes = pObj->schema[i].bytes;
    fields[i].offset = offset;
    // assert(data[i]->len == points*pObj->schema[i].bytes);

    if (pCfg->compression) {
S
slguan 已提交
1249
      cdata[i]->len = (*pCompFunc[(uint8_t)pObj->schema[i].type])(data[i]->data, points * pObj->schema[i].bytes, points,
H
hzcheng 已提交
1250 1251 1252 1253 1254 1255 1256
                                                         cdata[i]->data, pObj->schema[i].bytes*pObj->pointsPerFileBlock+EXTRA_BYTES, 
                                                         pCfg->compression, buffer, bufferSize);
      fields[i].len = cdata[i]->len;
      taosCalcChecksumAppend(0, (uint8_t *)(cdata[i]->data), cdata[i]->len + sizeof(TSCKSUM));
      offset += (cdata[i]->len + sizeof(TSCKSUM));

    } else {
H
Hongze Cheng 已提交
1257
      data[i]->len = pObj->schema[i].bytes * points;
H
hzcheng 已提交
1258 1259 1260 1261 1262 1263
      fields[i].len = data[i]->len;
      taosCalcChecksumAppend(0, (uint8_t *)(data[i]->data), data[i]->len + sizeof(TSCKSUM));
      offset += (data[i]->len + sizeof(TSCKSUM));
    }

    getStatistics(data[0]->data, data[i]->data, pObj->schema[i].bytes, points, pObj->schema[i].type, &fields[i].min,
S
slguan 已提交
1264
                  &fields[i].max, &fields[i].sum, &fields[i].minIndex, &fields[i].maxIndex, &fields[i].numOfNullPoints);
H
hzcheng 已提交
1265 1266 1267 1268 1269 1270
  }

  tfree(buffer);

  // Write SField part
  taosCalcChecksumAppend(0, (uint8_t *)fields, size);
S
slguan 已提交
1271
  wlen = twrite(dfd, fields, size);
H
hzcheng 已提交
1272 1273 1274 1275
  if (wlen <= 0) {
    tfree(fields);
    dError("vid:%d sid:%d id:%s, failed to write block, wlen:%d reason:%s", pObj->vnode, pObj->sid, pObj->meterId, wlen,
           strerror(errno));
S
slguan 已提交
1276 1277 1278
#ifdef CLUSTER		   
    return vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
#else
H
hzcheng 已提交
1279
    return -1;
S
slguan 已提交
1280
#endif	
H
hzcheng 已提交
1281 1282 1283 1284 1285 1286 1287 1288 1289
  }
  pVnode->vnodeStatistic.compStorage += wlen;
  pVnode->dfSize += wlen;
  pCompBlock->len += wlen;
  tfree(fields);

  // Write data part
  for (int i = 0; i < pObj->numOfColumns; ++i) {
    if (pCfg->compression) {
S
slguan 已提交
1290
      wlen = twrite(dfd, cdata[i]->data, cdata[i]->len + sizeof(TSCKSUM));
H
hzcheng 已提交
1291
    } else {
S
slguan 已提交
1292
      wlen = twrite(dfd, data[i]->data, data[i]->len + sizeof(TSCKSUM));
H
hzcheng 已提交
1293 1294 1295 1296 1297
    }

    if (wlen <= 0) {
      dError("vid:%d sid:%d id:%s, failed to write block, wlen:%d points:%d reason:%s",
             pObj->vnode, pObj->sid, pObj->meterId, wlen, points, strerror(errno));
S
slguan 已提交
1298
      return vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
H
hzcheng 已提交
1299 1300 1301 1302 1303 1304 1305
    }

    pVnode->vnodeStatistic.compStorage += wlen;
    pVnode->dfSize += wlen;
    pCompBlock->len += wlen;
  }

L
lihui 已提交
1306
  dTrace("vid:%d, vnode compStorage size is: %" PRId64, pObj->vnode, pVnode->vnodeStatistic.compStorage);
H
hzcheng 已提交
1307 1308 1309 1310 1311 1312 1313

  pCompBlock->algorithm = pCfg->compression;
  pCompBlock->numOfPoints = points;
  pCompBlock->numOfCols = pObj->numOfColumns;
  pCompBlock->keyFirst = *((TSKEY *)(data[0]->data));  // hack way to get the key
  pCompBlock->keyLast = *((TSKEY *)(data[0]->data + (points - 1) * pObj->schema[0].bytes));
  pCompBlock->sversion = pObj->sversion;
H
Hongze Cheng 已提交
1314
  assert(pCompBlock->keyFirst <= pCompBlock->keyLast);
H
hzcheng 已提交
1315 1316 1317 1318 1319 1320 1321 1322 1323

  return 0;
}

static int forwardInFile(SQuery *pQuery, int32_t midSlot, int32_t step, SVnodeObj *pVnode, SMeterObj *pObj);

int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery) {
  TSKEY       latest, oldest;
  int         ret = 0;
S
slguan 已提交
1324
  int64_t     delta = 0;
H
hzcheng 已提交
1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
  int         firstSlot, lastSlot, midSlot;
  int         numOfBlocks;
  char *      temp = NULL, *data = NULL;
  SCompBlock *pBlock = NULL;
  SVnodeObj * pVnode = &vnodeList[pObj->vnode];
  int         step;
  char *      buffer = NULL;
  int         bufferSize = 0;
  int         dfd;

  // if file is broken, pQuery->slot = -2; if not found, pQuery->slot = -1;

  pQuery->slot = -1;
  pQuery->pos = -1;
  if (pVnode->numOfFiles <= 0) return 0;

  SVnodeCfg *pCfg = &pVnode->cfg;
S
slguan 已提交
1342
  delta = (int64_t)pCfg->daysPerFile * tsMsPerDay[(uint8_t)pVnode->cfg.precision];
H
hzcheng 已提交
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
  latest = pObj->lastKeyOnFile;
  oldest = (pVnode->fileId - pVnode->numOfFiles + 1) * delta;

  if (latest < oldest) return 0;

  if (!QUERY_IS_ASC_QUERY(pQuery)) {
    if (pQuery->skey < oldest) return 0;
    if (pQuery->ekey > latest) return 0;
    if (pQuery->skey > latest) pQuery->skey = latest;
  } else {
    if (pQuery->skey > latest) return 0;
    if (pQuery->ekey < oldest) return 0;
    if (pQuery->skey < oldest) pQuery->skey = oldest;
  }

L
lihui 已提交
1358
  dTrace("vid:%d sid:%d id:%s, skey:%" PRId64 " ekey:%" PRId64 " oldest:%" PRId64 " latest:%" PRId64 " fileId:%d numOfFiles:%d",
H
hzcheng 已提交
1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
         pObj->vnode, pObj->sid, pObj->meterId, pQuery->skey, pQuery->ekey, oldest, latest, pVnode->fileId,
         pVnode->numOfFiles);

  step = QUERY_IS_ASC_QUERY(pQuery) ? 1 : -1;

  pQuery->fileId = pQuery->skey / delta;  // starting fileId
  pQuery->fileId -= step;                 // hacker way to make while loop below works

  bufferSize = pCfg->rowsInFileBlock*sizeof(TSKEY)+EXTRA_BYTES;
  buffer = (char *)calloc(1, bufferSize);

  while (1) {
    pQuery->fileId += step;

    if ((pQuery->fileId > pVnode->fileId) || (pQuery->fileId < pVnode->fileId - pVnode->numOfFiles + 1)) {
      tfree(buffer);
      return 0;
    }

    ret = vnodeGetCompBlockInfo(pObj, pQuery);
    if (ret == 0) continue;
    if (ret < 0) break;  // file broken

    pBlock = pQuery->pBlock;

    firstSlot = 0;
    lastSlot = pQuery->numOfBlocks - 1;
L
lihui 已提交
1386
    //numOfBlocks = pQuery->numOfBlocks;
H
hzcheng 已提交
1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431
    if (QUERY_IS_ASC_QUERY(pQuery) && pBlock[lastSlot].keyLast < pQuery->skey) continue;
    if (!QUERY_IS_ASC_QUERY(pQuery) && pBlock[firstSlot].keyFirst > pQuery->skey) continue;

    while (1) {
      numOfBlocks = lastSlot - firstSlot + 1;
      midSlot = (firstSlot + (numOfBlocks >> 1));

      if (numOfBlocks == 1) break;

      if (pQuery->skey > pBlock[midSlot].keyLast) {
        if (numOfBlocks == 2) break;
        if (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->skey < pBlock[midSlot + 1].keyFirst)) break;
        firstSlot = midSlot + 1;
      } else if (pQuery->skey < pBlock[midSlot].keyFirst) {
        if (QUERY_IS_ASC_QUERY(pQuery) && (pQuery->skey > pBlock[midSlot - 1].keyLast)) break;
        lastSlot = midSlot - 1;
      } else {
        break;  // got the slot
      }
    }

    pQuery->slot = midSlot;
    if (!QUERY_IS_ASC_QUERY(pQuery)) {
      if (pQuery->skey < pBlock[midSlot].keyFirst) break;

      if (pQuery->ekey > pBlock[midSlot].keyLast) {
        pQuery->slot = midSlot + 1;
        break;
      }
    } else {
      if (pQuery->skey > pBlock[midSlot].keyLast) {
        pQuery->slot = midSlot + 1;
        break;
      }

      if (pQuery->ekey < pBlock[midSlot].keyFirst) break;
    }

    temp = malloc(pObj->pointsPerFileBlock * TSDB_KEYSIZE + EXTRA_BYTES);  // only first column
    data = malloc(pObj->pointsPerFileBlock * TSDB_KEYSIZE + EXTRA_BYTES);  // only first column
    dfd = pBlock[midSlot].last ? pQuery->lfd : pQuery->dfd;
    ret = vnodeReadColumnToMem(dfd, pBlock + midSlot, pQuery->pFields + midSlot, 0, data,
                               pObj->pointsPerFileBlock*TSDB_KEYSIZE+EXTRA_BYTES,
                               temp, buffer, bufferSize);
    if (ret < 0) {
S
slguan 已提交
1432
      ret = vnodeRecoverFromPeer(pVnode, pQuery->fileId);
H
hzcheng 已提交
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564
      break;
    }  // file broken

    pQuery->pos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])(data, pBlock[midSlot].numOfPoints, pQuery->skey,
                                                               pQuery->order.order);
    pQuery->key = *((TSKEY *)(data + pObj->schema[0].bytes * pQuery->pos));

    ret = vnodeForwardStartPosition(pQuery, pBlock, midSlot, pVnode, pObj);
    break;
  }

  tfree(buffer);
  tfree(temp);
  tfree(data);

  return ret;
}

int vnodeForwardStartPosition(SQuery *pQuery, SCompBlock *pBlock, int32_t slotIdx, SVnodeObj *pVnode, SMeterObj *pObj) {
  int step = QUERY_IS_ASC_QUERY(pQuery) ? 1 : -1;

  if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) {
    int maxReads = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->numOfPoints - pQuery->pos : pQuery->pos + 1;

    if (pQuery->limit.offset < maxReads) {  // start position in current block
      if (QUERY_IS_ASC_QUERY(pQuery)) {
        pQuery->pos += pQuery->limit.offset;
      } else {
        pQuery->pos -= pQuery->limit.offset;
      }

      pQuery->limit.offset = 0;

    } else {
      pQuery->limit.offset -= maxReads;
      slotIdx += step;

      return forwardInFile(pQuery, slotIdx, step, pVnode, pObj);
    }
  }

  return pQuery->numOfBlocks;
}

int forwardInFile(SQuery *pQuery, int32_t slotIdx, int32_t step, SVnodeObj *pVnode, SMeterObj *pObj) {
  SCompBlock *pBlock = pQuery->pBlock;

  while (slotIdx < pQuery->numOfBlocks && slotIdx >= 0 && pQuery->limit.offset >= pBlock[slotIdx].numOfPoints) {
    pQuery->limit.offset -= pBlock[slotIdx].numOfPoints;
    slotIdx += step;
  }

  if (slotIdx < pQuery->numOfBlocks && slotIdx >= 0) {
    if (QUERY_IS_ASC_QUERY(pQuery)) {
      pQuery->pos = pQuery->limit.offset;
    } else {
      pQuery->pos = pBlock[slotIdx].numOfPoints - pQuery->limit.offset - 1;
    }
    pQuery->slot = slotIdx;
    pQuery->limit.offset = 0;

    return pQuery->numOfBlocks;
  } else {  // continue in next file, forward pQuery->limit.offset points
    int ret = 0;
    pQuery->slot = -1;
    pQuery->pos = -1;

    while (1) {
      pQuery->fileId += step;
      if ((pQuery->fileId > pVnode->fileId) || (pQuery->fileId < pVnode->fileId - pVnode->numOfFiles + 1)) {
        pQuery->lastKey = pObj->lastKeyOnFile;
        pQuery->skey = pQuery->lastKey + 1;
        return 0;
      }

      ret = vnodeGetCompBlockInfo(pObj, pQuery);
      if (ret == 0) continue;
      if (ret > 0) break;  // qualified file
    }

    if (ret > 0) {
      int startSlot = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pQuery->numOfBlocks - 1;
      return forwardInFile(pQuery, startSlot, step, pVnode, pObj);
    } else {
      return ret;
    }
  }
}

static FORCE_INLINE TSKEY vnodeGetTSInDataBlock(SQuery *pQuery, int32_t pos, int32_t factor) {
  return *(TSKEY *)(pQuery->tsData->data + (pQuery->pointsOffset * factor + pos) * TSDB_KEYSIZE);
}

int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
  int numOfReads = 0;

  int         lastPos = -1, startPos;
  int         col, step, code = 0;
  char *      pRead, *pData;
  SData *     sdata[TSDB_MAX_COLUMNS];
  SCompBlock *pBlock = NULL;
  SVnodeObj * pVnode = &vnodeList[pObj->vnode];
  pQuery->pointsRead = 0;
  int keyLen = TSDB_KEYSIZE;

  if (pQuery->over) return 0;

  if (pQuery->slot < 0)  // it means a new query, we need to find the point first
    code = vnodeSearchPointInFile(pObj, pQuery);

  if (code < 0 || pQuery->slot < 0 || pQuery->pos == -1) {
    pQuery->over = 1;
    return code;
  }

  step = QUERY_IS_ASC_QUERY(pQuery) ? -1 : 1;
  pBlock = pQuery->pBlock + pQuery->slot;

  if (pQuery->pos == FILE_QUERY_NEW_BLOCK) {
    if (!QUERY_IS_ASC_QUERY(pQuery)) {
      if (pQuery->ekey > pBlock->keyLast) pQuery->over = 1;
      if (pQuery->skey < pBlock->keyFirst) pQuery->over = 1;
    } else {
      if (pQuery->ekey < pBlock->keyFirst) pQuery->over = 1;
      if (pQuery->skey > pBlock->keyLast) pQuery->over = 1;
    }

    pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pBlock->numOfPoints - 1;
  }

  if (pQuery->over) return 0;

1565 1566 1567 1568
  // To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system.
  for(col = 0; col < pQuery->numOfCols; ++col) {
    sdata[col] = calloc(1, sizeof(SData) + pBlock->numOfPoints * pQuery->colList[col].data.bytes + EXTRA_BYTES);
  }
H
hzcheng 已提交
1569 1570 1571 1572 1573 1574

  /*
   * timestamp column is fetched in any cases. Therefore, if the query does not fetch primary column,
   * we allocate tsData buffer with twice size of the other ordinary pQuery->sdata.
   * Otherwise, the query function may over-write buffer area while retrieve function has not packed the results into
   * message to send to client yet.
1575
   *
H
hzcheng 已提交
1576 1577
   * So the startPositionFactor is needed to denote which half part is used to store the result, and which
   * part is available for keep data during query process.
1578 1579
   *
   * Note: the startPositionFactor must be used in conjunction with pQuery->pointsOffset
H
hzcheng 已提交
1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595
   */
  int32_t startPositionFactor = 1;
  if (pQuery->colList[0].colIdx == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
    pQuery->tsData = sdata[0];
    startPositionFactor = 0;
  }

  code = vnodeReadCompBlockToMem(pObj, pQuery, sdata);
  if (code < 0) {
    dError("vid:%d sid:%d id:%s, failed to read block:%d numOfPoints:%d", pObj->vnode, pObj->sid, pObj->meterId,
           pQuery->slot, pBlock->numOfPoints);
    goto _next;
  }

  int maxReads = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->numOfPoints - pQuery->pos : pQuery->pos + 1;

1596 1597 1598
  TSKEY startKey = vnodeGetTSInDataBlock(pQuery, 0, startPositionFactor);
  TSKEY endKey = vnodeGetTSInDataBlock(pQuery, pBlock->numOfPoints - 1, startPositionFactor);

H
hzcheng 已提交
1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
  if (QUERY_IS_ASC_QUERY(pQuery)) {
    if (endKey < pQuery->ekey) {
      numOfReads = maxReads;
    } else {
      lastPos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])(
          pQuery->tsData->data + keyLen * (pQuery->pos + pQuery->pointsOffset * startPositionFactor), maxReads,
          pQuery->ekey, TSQL_SO_DESC);
      numOfReads = (lastPos >= 0) ? lastPos + 1 : 0;
    }
  } else {
    if (startKey > pQuery->ekey) {
      numOfReads = maxReads;
    } else {
      lastPos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])(
          pQuery->tsData->data + keyLen * pQuery->pointsOffset * startPositionFactor, maxReads, pQuery->ekey,
          TSQL_SO_ASC);
      numOfReads = (lastPos >= 0) ? pQuery->pos - lastPos + 1 : 0;
    }
  }

  if (numOfReads > pQuery->pointsToRead - pQuery->pointsRead) {
    numOfReads = pQuery->pointsToRead - pQuery->pointsRead;
  } else {
    if (lastPos >= 0 || numOfReads == 0) {
      pQuery->keyIsMet = 1;
      pQuery->over = 1;
    }
  }

  startPos = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos : pQuery->pos - numOfReads + 1;

  int32_t numOfQualifiedPoints = 0;
  int32_t numOfActualRead = numOfReads;

  // copy data to result buffer
  if (pQuery->numOfFilterCols == 0) {
    // no filter condition on ordinary columns
    for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
      int16_t colBufferIndex = pQuery->pSelectExpr[i].pBase.colInfo.colIdxInBuf;
      int32_t bytes = GET_COLUMN_BYTES(pQuery, i);

      pData = pQuery->sdata[i]->data + pQuery->pointsOffset * bytes;
      pRead = sdata[colBufferIndex]->data + startPos * bytes;

1643 1644 1645 1646 1647 1648 1649
      if (QUERY_IS_ASC_QUERY(pQuery)) {
        memcpy(pData, pRead, numOfReads * bytes);
      } else { //reversed copy to output buffer
        for(int32_t j = 0; j < numOfReads; ++j) {
          memcpy(pData + bytes * j, pRead + (numOfReads - 1 - j) * bytes, bytes);
        }
      }
H
hzcheng 已提交
1650 1651 1652 1653 1654
    }
    numOfQualifiedPoints = numOfReads;
  } else {
    // check each data one by one set the input column data
    for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
S
slguan 已提交
1655 1656
      struct SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
      pFilterInfo->pData = sdata[pFilterInfo->info.colIdxInBuf]->data;
H
hzcheng 已提交
1657 1658 1659 1660 1661 1662 1663 1664
    }

    int32_t *ids = calloc(1, numOfReads * sizeof(int32_t));
    numOfActualRead = 0;

    if (QUERY_IS_ASC_QUERY(pQuery)) {
      for (int32_t j = startPos; j < pBlock->numOfPoints; j -= step) {
        TSKEY key = vnodeGetTSInDataBlock(pQuery, j, startPositionFactor);
1665
        if (key < startKey || key > endKey) {
L
lihui 已提交
1666 1667
          dError("vid:%d sid:%d id:%s, timestamp in file block disordered. slot:%d, pos:%d, ts:%" PRId64 ", block "
                 "range:%" PRId64 "-%" PRId64, pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startKey, endKey);
1668 1669 1670
          tfree(ids);
          return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED;
        }
H
hzcheng 已提交
1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681

        // out of query range, quit
        if (key > pQuery->ekey) {
          break;
        }

        if (!vnodeFilterData(pQuery, &numOfActualRead, j)) {
          continue;
        }

        ids[numOfQualifiedPoints] = j;
1682
        if (++numOfQualifiedPoints == numOfReads) { // qualified data are enough
H
hzcheng 已提交
1683 1684 1685 1686 1687 1688
          break;
        }
      }
    } else {
      for (int32_t j = pQuery->pos; j >= 0; --j) {
        TSKEY key = vnodeGetTSInDataBlock(pQuery, j, startPositionFactor);
1689
        if (key < startKey || key > endKey) {
L
lihui 已提交
1690 1691
          dError("vid:%d sid:%d id:%s, timestamp in file block disordered. slot:%d, pos:%d, ts:%" PRId64 ", block "
                 "range:%" PRId64 "-%" PRId64, pObj->vnode, pObj->sid, pObj->meterId, pQuery->slot, j, key, startKey, endKey);
1692 1693 1694
          tfree(ids);
          return -TSDB_CODE_FILE_BLOCK_TS_DISORDERED;
        }
H
hzcheng 已提交
1695 1696 1697 1698 1699 1700 1701 1702 1703

        // out of query range, quit
        if (key < pQuery->ekey) {
          break;
        }

        if (!vnodeFilterData(pQuery, &numOfActualRead, j)) {
          continue;
        }
1704 1705 1706
  
        ids[numOfQualifiedPoints] = j;
        if (++numOfQualifiedPoints == numOfReads) { // qualified data are enough
H
hzcheng 已提交
1707 1708 1709 1710 1711
          break;
        }
      }
    }

1712
//    int32_t start = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfReads - numOfQualifiedPoints;
H
hzcheng 已提交
1713 1714 1715 1716 1717
    for (int32_t j = 0; j < numOfQualifiedPoints; ++j) {
      for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
        int16_t colIndexInBuffer = pQuery->pSelectExpr[col].pBase.colInfo.colIdxInBuf;
        int32_t bytes = GET_COLUMN_BYTES(pQuery, col);
        pData = pQuery->sdata[col]->data + (pQuery->pointsOffset + j) * bytes;
1718
        pRead = sdata[colIndexInBuffer]->data + ids[j/* + start*/] * bytes;
H
hzcheng 已提交
1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776

        memcpy(pData, pRead, bytes);
      }
    }

    tfree(ids);
    assert(numOfQualifiedPoints <= numOfReads);
  }

  // Note: numOfQualifiedPoints may be 0, since no data in this block are qualified
  assert(pQuery->pointsRead == 0);

  pQuery->pointsRead += numOfQualifiedPoints;
  for (col = 0; col < pQuery->numOfOutputCols; ++col) {
    int16_t bytes = GET_COLUMN_BYTES(pQuery, col);
    pQuery->sdata[col]->len = bytes * (pQuery->pointsOffset + pQuery->pointsRead);
  }
  pQuery->pos -= numOfActualRead * step;

  // update the lastkey/skey
  int32_t lastAccessPos = pQuery->pos + step;
  pQuery->lastKey = vnodeGetTSInDataBlock(pQuery, lastAccessPos, startPositionFactor);
  pQuery->skey = pQuery->lastKey - step;

_next:
  if ((pQuery->pos < 0 || pQuery->pos >= pBlock->numOfPoints || numOfReads == 0) && (pQuery->over == 0)) {
    pQuery->slot = pQuery->slot - step;
    pQuery->pos = FILE_QUERY_NEW_BLOCK;
  }

  if ((pQuery->slot < 0 || pQuery->slot >= pQuery->numOfBlocks) && (pQuery->over == 0)) {
    int ret;

    while (1) {
      ret = -1;
      pQuery->fileId -= step;  // jump to next file

      if (QUERY_IS_ASC_QUERY(pQuery)) {
        if (pQuery->fileId > pVnode->fileId) {
          // to do:
          // check if file is updated, if updated, open again and check if this Meter is updated
          // if meter is updated, read in new block info, and
          break;
        }
      } else {
        if ((pVnode->fileId - pQuery->fileId + 1) > pVnode->numOfFiles) break;
      }

      ret = vnodeGetCompBlockInfo(pObj, pQuery);
      if (ret > 0) break;
      if (ret < 0) code = ret;
    }

    if (ret <= 0) pQuery->over = 1;

    pQuery->slot = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pQuery->numOfBlocks - 1;
  }

1777 1778 1779
  for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
    tfree(sdata[i]);
  }
H
hzcheng 已提交
1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802

  return code;
}

int vnodeUpdateFileMagic(int vnode, int fileId) {
  struct stat fstat;
  char        fileName[256];

  SVnodeObj *pVnode = vnodeList + vnode;
  uint64_t   magic = 0;

  vnodeGetHeadDataLname(fileName, NULL, NULL, vnode, fileId);
  if (stat(fileName, &fstat) != 0) {
    dError("vid:%d, head file:%s is not there", vnode, fileName);
    return -1;
  }

  int size = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM) + TSDB_FILE_HEADER_LEN;
  if (fstat.st_size < size) {
    dError("vid:%d, head file:%s is corrupted", vnode, fileName);
    return -1;
  }

S
slguan 已提交
1803 1804 1805
#ifdef CLUSTER
  //if (fstat.st_size == size) return 0;
#else
H
hzcheng 已提交
1806
  if (fstat.st_size == size) return 0;
S
slguan 已提交
1807
#endif
H
hzcheng 已提交
1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827
  vnodeGetHeadDataLname(NULL, fileName, NULL, vnode, fileId);
  if (stat(fileName, &fstat) == 0) {
    magic = fstat.st_size;
  } else {
    dError("vid:%d, data file:%s is not there", vnode, fileName);
    return -1;
  }

  vnodeGetHeadDataLname(NULL, NULL, fileName, vnode, fileId);
  if (stat(fileName, &fstat) == 0) {
    magic += fstat.st_size;
  }

  int slot = fileId % pVnode->maxFiles;
  pVnode->fmagic[slot] = magic;

  return 0;
}

int vnodeInitFile(int vnode) {
S
slguan 已提交
1828
  int        code = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1829 1830 1831 1832 1833 1834 1835 1836
  SVnodeObj *pVnode = vnodeList + vnode;

  pVnode->maxFiles = pVnode->cfg.daysToKeep / pVnode->cfg.daysPerFile + 1;
  pVnode->maxFile1 = pVnode->cfg.daysToKeep1 / pVnode->cfg.daysPerFile;
  pVnode->maxFile2 = pVnode->cfg.daysToKeep2 / pVnode->cfg.daysPerFile;
  pVnode->fmagic = (uint64_t *)calloc(pVnode->maxFiles + 1, sizeof(uint64_t));
  int fileId = pVnode->fileId;

S
slguan 已提交
1837 1838 1839 1840 1841 1842 1843 1844 1845
  /*
   * The actual files will far exceed the files that need to exist
   */
  if (pVnode->numOfFiles > pVnode->maxFiles) {
    dError("vid:%d numOfFiles:%d should not larger than maxFiles:%d", vnode, pVnode->numOfFiles, pVnode->maxFiles);
  }

  int numOfFiles = MIN(pVnode->numOfFiles, pVnode->maxFiles);
  for (int i = 0; i < numOfFiles; ++i) {
H
hzcheng 已提交
1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862
    if (vnodeUpdateFileMagic(vnode, fileId) < 0) {
      if (pVnode->cfg.replications > 1) {
        pVnode->badFileId = fileId;
      }
      dError("vid:%d fileId:%d is corrupted", vnode, fileId);
    } else {
      dTrace("vid:%d fileId:%d is checked", vnode, fileId);
    }

    fileId--;
  }

  return code;
}

int vnodeRecoverCompHeader(int vnode, int fileId) {
  // TODO: try to recover SCompHeader part
S
slguan 已提交
1863
  dTrace("starting to recover vnode head file comp header part, vnode: %d fileId: %d", vnode, fileId);
H
hzcheng 已提交
1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880
  assert(0);
  return 0;
}

int vnodeRecoverHeadFile(int vnode, int fileId) {
  // TODO: try to recover SCompHeader part
  dTrace("starting to recover vnode head file, vnode: %d, fileId: %d", vnode, fileId);
  assert(0);
  return 0;
}

int vnodeRecoverDataFile(int vnode, int fileId) {
  // TODO: try to recover SCompHeader part
  dTrace("starting to recover vnode data file, vnode: %d, fileId: %d", vnode, fileId);
  assert(0);
  return 0;
}