vnodeImport.c 59.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
H
hzcheng 已提交
18 19 20

#include "vnode.h"
#include "vnodeUtil.h"
H
hjxilinx 已提交
21
#include "vnodeStatus.h"
H
hzcheng 已提交
22

23 24 25 26 27 28 29 30
extern void         vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId);
extern int          vnodeReadColumnToMem(int fd, SCompBlock *pBlock, SField **fields, int col, char *data, int dataSize,
                                         char *temp, char *buffer, int bufferSize);
extern int          vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints);
extern void         vnodeGetHeadDataLname(char *headName, char *dataName, char *lastName, int vnode, int fileId);
extern int          vnodeCreateEmptyCompFile(int vnode, int fileId);
extern int          vnodeUpdateFreeSlot(SVnodeObj *pVnode);
extern SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode);
H
Hongze Cheng 已提交
31
extern int          vnodeCreateNeccessaryFiles(SVnodeObj *pVnode);
H
hzcheng 已提交
32

33
#define KEY_AT_INDEX(payload, step, idx) (*(TSKEY *)((char *)(payload) + (step) * (idx)))
H
hzcheng 已提交
34
typedef struct {
S
slguan 已提交
35
  void *     signature;
H
hzcheng 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
  SShellObj *pShell;
  SMeterObj *pObj;
  int        retry;
  TSKEY      firstKey;
  TSKEY      lastKey;
  int        importedRows;
  int        commit;  // start to commit if it is set to 1

  int   slot;  // slot/block to start writing the import data
  int   pos;   // pos to start writing the import data in the slot/block
  TSKEY key;

  // only for file
  int     numOfPoints;
  int64_t offset;  // offset in data file
51 52
  char *  payload;
  char *  opayload;  // allocated space for payload from client
H
hzcheng 已提交
53 54 55
  int     rows;
} SImportInfo;

56 57 58 59
typedef struct {
  // in .head file
  SCompHeader *pHeader;
  size_t       pHeaderSize;
H
hzcheng 已提交
60

61 62 63 64 65
  SCompInfo   compInfo;
  SCompBlock *pBlocks;
  // in .data file
  int     blockId;
  uint8_t blockLoadState;
H
hzcheng 已提交
66

67 68
  SField *pField;
  size_t  pFieldSize;
H
hzcheng 已提交
69

70 71
  SData *data[TSDB_MAX_COLUMNS];
  char * buffer;
H
hzcheng 已提交
72

73
  char *temp;
H
hzcheng 已提交
74

75 76 77 78 79 80 81
  char * tempBuffer;
  size_t tempBufferSize;
  // Variables for sendfile
  int64_t compInfoOffset;
  int64_t nextNo0Offset;  // next sid whose compInfoOffset > 0
  int64_t hfSize;
  int64_t driftOffset;
H
hzcheng 已提交
82

83 84 85 86
  int oldNumOfBlocks;
  int newNumOfBlocks;
  int last;
} SImportHandle;
H
hzcheng 已提交
87

88 89 90 91 92 93
typedef struct {
  int   slot;
  int   pos;
  int   oslot;  // old slot
  TSKEY nextKey;
} SBlockIter;
H
hzcheng 已提交
94

95 96 97 98 99 100
typedef struct {
  int64_t spos;
  int64_t epos;
  int64_t totalRows;
  char *  offset[];
} SMergeBuffer;
H
hzcheng 已提交
101

102
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport);
H
hzcheng 已提交
103

104 105 106 107 108
int vnodeFindKeyInCache(SImportInfo *pImport, int order) {
  SMeterObj * pObj = pImport->pObj;
  int         code = 0;
  SQuery      query;
  SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
H
hzcheng 已提交
109

110 111 112 113 114 115
  TSKEY key = order ? pImport->firstKey : pImport->lastKey;
  memset(&query, 0, sizeof(query));
  query.order.order = order;
  query.skey = key;
  query.ekey = order ? pImport->lastKey : pImport->firstKey;
  vnodeSearchPointInCache(pObj, &query);
H
hzcheng 已提交
116

117 118 119 120 121
  if (query.slot < 0) {
    pImport->slot = pInfo->commitSlot;
    if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
    pImport->pos = 0;
    pImport->key = 0;
L
lihui 已提交
122
    dTrace("vid:%d sid:%d id:%s, key:%" PRId64 ", import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key);
123
    code = 0;
H
hzcheng 已提交
124
  } else {
125 126 127
    pImport->slot = query.slot;
    pImport->pos = query.pos;
    pImport->key = query.key;
H
hzcheng 已提交
128

129 130 131 132 133 134 135 136
    if (key != query.key) {
      if (order == 0) {
        // since pos is the position which has smaller key, data shall be imported after it
        pImport->pos++;
        if (pImport->pos >= pObj->pointsPerBlock) {
          pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
          pImport->pos = 0;
        }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137
      } else {
138
        if (pImport->pos < 0) pImport->pos = 0;
H
hzcheng 已提交
139 140
      }
    }
141
    code = 0;
H
hzcheng 已提交
142 143
  }

144
  return code;
H
hzcheng 已提交
145 146
}

147 148
void vnodeGetValidDataRange(int vnode, TSKEY now, TSKEY *minKey, TSKEY *maxKey) {
  SVnodeObj *pVnode = vnodeList + vnode;
H
hzcheng 已提交
149

S
slguan 已提交
150
  int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[(uint8_t)pVnode->cfg.precision];
151 152 153 154 155
  int     fid = now / delta;
  *minKey = (fid - pVnode->maxFiles + 1) * delta;
  *maxKey = (fid + 2) * delta - 1;
  return;
}
H
hzcheng 已提交
156

157 158 159 160
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
                      int *pNumOfPoints, TSKEY now) {
  SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
  SVnodeObj * pVnode = vnodeList + pObj->vnode;
H
Hongze Cheng 已提交
161 162
  int         rows = 0;
  char *      payload = NULL;
H
Hongze Cheng 已提交
163
  int         code = TSDB_CODE_SUCCESS;
164 165
  SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
  SShellObj * pShell = (SShellObj *)param;
H
Hongze Cheng 已提交
166
  TSKEY       firstKey, lastKey;
H
hzcheng 已提交
167

168
  payload = pSubmit->payLoad;
H
hzcheng 已提交
169

H
Hongze Cheng 已提交
170 171 172 173 174 175 176 177
  rows = htons(pSubmit->numOfRows);
  assert(rows > 0);
  int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
  if (expectedLen != contLen) {
    dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
           expectedLen, contLen);
    return TSDB_CODE_WRONG_MSG_SIZE;
  }
H
hzcheng 已提交
178

H
Hongze Cheng 已提交
179 180
  // Check timestamp context.
  TSKEY minKey = 0, maxKey = 0;
H
Hongze Cheng 已提交
181
  firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
H
Hongze Cheng 已提交
182 183 184 185 186
  lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
  assert(firstKey <= lastKey);
  vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey);
  if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) {
    dError(
L
lihui 已提交
187 188
        "vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %" PRId64 " lastKey: %" PRId64 " minAllowedKey:%" PRId64 " "
        "maxAllowedKey:%" PRId64,
H
Hongze Cheng 已提交
189 190 191
        pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey);
    return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
  }
H
Hongze Cheng 已提交
192 193 194 195 196
    // forward to peers
  if (pShell && pVnode->cfg.replications > 1) {
    code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_IMPORT, sversion);
    if (code != 0) return code;
  }
H
hzcheng 已提交
197

H
Hongze Cheng 已提交
198 199 200 201
  if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
    if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
    code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
    if (code != 0) return code;
H
hzcheng 已提交
202 203
  }

H
hjxilinx 已提交
204 205 206 207
  /*
   * The timestamp of all records in a submit payload are always in ascending order, guaranteed by client, so here only
   * the first key.
   */
208
  if (firstKey > pObj->lastKey) {  // Just call insert
H
Hongze Cheng 已提交
209
    code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now);
210
  } else {  // trigger import
H
Hongze Cheng 已提交
211 212 213 214
    if (sversion != pObj->sversion) {
      dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
             pObj->sversion, sversion);
      return TSDB_CODE_OTHERS;
H
Hongze Cheng 已提交
215
    }
H
hjxilinx 已提交
216 217 218 219 220 221 222
  
    // check the table status for perform import historical data
    if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) {
      return code;
    }
    
    SImportInfo import = {0};
H
hzcheng 已提交
223

L
lihui 已提交
224
    dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%" PRId64 ", lastKey:%" PRId64 ", object lastKey:%" PRId64,
225
           pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey);
H
hjxilinx 已提交
226
    
227 228 229 230 231 232
    import.firstKey = firstKey;
    import.lastKey = lastKey;
    import.pObj = pObj;
    import.pShell = pShell;
    import.payload = payload;
    import.rows = rows;
H
hzcheng 已提交
233

H
hjxilinx 已提交
234
    // FIXME: mutex here seems meaningless and num here still can be changed
H
Hongze Cheng 已提交
235 236 237 238
    // int32_t num = 0;
    // pthread_mutex_lock(&pVnode->vmutex);
    // num = pObj->numOfQueries;
    // pthread_mutex_unlock(&pVnode->vmutex);
H
hzcheng 已提交
239

240
    int32_t commitInProcess = 0;
H
hzcheng 已提交
241

242
    pthread_mutex_lock(&pPool->vmutex);
H
Hongze Cheng 已提交
243
    if ((commitInProcess = pPool->commitInProcess) == 1) {
H
hjxilinx 已提交
244
      // mutual exclusion with read (need to change here)
245
      pthread_mutex_unlock(&pPool->vmutex);
H
hjxilinx 已提交
246
      vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
H
Hongze Cheng 已提交
247
      return TSDB_CODE_ACTION_IN_PROGRESS;
H
Hongze Cheng 已提交
248 249 250 251 252 253 254 255 256
    } 
    
    int loop = 0;
    while (pObj->numOfQueries > 0) {
      loop++;
      if (loop > 1000) {
        sched_yield();
        loop = 0;
      }
H
hzcheng 已提交
257
    }
H
Hongze Cheng 已提交
258 259 260 261 262 263

    pPool->commitInProcess = 1;
    pthread_mutex_unlock(&pPool->vmutex);
    code = vnodeImportData(pObj, &import);
    *pNumOfPoints = import.importedRows;

H
Hongze Cheng 已提交
264
    pVnode->version++;
H
Hongze Cheng 已提交
265
    vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
266
  }
H
hjxilinx 已提交
267
  
H
Hongze Cheng 已提交
268
  return code;
269
}
H
hzcheng 已提交
270

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
/* Function to search keys in a range
 *
 * Assumption: keys in payload are in ascending order
 *
 * @payload: data records, key in ascending order
 * @step:    bytes each record takes
 * @rows:    number of data records
 * @skey:    range start (included)
 * @ekey:    range end (included)
 * @srows:   rtype, start index of records
 * @nrows:   rtype, number of records in range
 *
 * @rtype:   0 means find data in the range
 *          -1 means find no data in the range
 */
static int vnodeSearchKeyInRange(char *payload, int step, int rows, TSKEY skey, TSKEY ekey, int *srow, int *nrows) {
  if (rows <= 0 || KEY_AT_INDEX(payload, step, 0) > ekey || KEY_AT_INDEX(payload, step, rows - 1) < skey || skey > ekey)
    return -1;
H
hzcheng 已提交
289

290 291 292 293 294 295 296 297 298 299 300 301 302
  int left = 0;
  int right = rows - 1;
  int mid;

  // Binary search the first key in payload >= skey
  do {
    mid = (left + right) / 2;
    if (skey < KEY_AT_INDEX(payload, step, mid)) {
      right = mid;
    } else if (skey > KEY_AT_INDEX(payload, step, mid)) {
      left = mid + 1;
    } else {
      break;
H
hzcheng 已提交
303
    }
304
  } while (left < right);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
305

306 307 308 309 310 311 312 313 314
  if (skey <= KEY_AT_INDEX(payload, step, mid)) {
    *srow = mid;
  } else {
    if (mid + 1 >= rows) {
      return -1;
    } else {
      *srow = mid + 1;
    }
  }
H
hzcheng 已提交
315

316
  assert(skey <= KEY_AT_INDEX(payload, step, *srow));
H
hzcheng 已提交
317

318 319 320 321 322 323
  *nrows = 0;
  for (int i = *srow; i < rows; i++) {
    if (KEY_AT_INDEX(payload, step, i) <= ekey) {
      (*nrows)++;
    } else {
      break;
H
hzcheng 已提交
324
    }
325
  }
H
hzcheng 已提交
326

327
  if (*nrows == 0) return -1;
H
hzcheng 已提交
328

329 330
  return 0;
}
H
hzcheng 已提交
331

332 333 334 335 336
int vnodeOpenMinFilesForImport(int vnode, int fid) {
  char        dname[TSDB_FILENAME_LEN] = "\0";
  SVnodeObj * pVnode = vnodeList + vnode;
  struct stat filestat;
  int         minFileSize;
H
hzcheng 已提交
337

338
  minFileSize = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
H
hzcheng 已提交
339

340
  vnodeGetHeadDataLname(pVnode->cfn, dname, pVnode->lfn, vnode, fid);
H
hzcheng 已提交
341

342 343 344 345 346 347 348
  // Open .head file
  pVnode->hfd = open(pVnode->cfn, O_RDONLY);
  if (pVnode->hfd < 0) {
    dError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
    taosLogError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
    goto _error_open;
  }
H
hzcheng 已提交
349

350 351 352 353 354
  fstat(pVnode->hfd, &filestat);
  if (filestat.st_size < minFileSize) {
    dError("vid:%d, head file:%s is corrupted", vnode, pVnode->cfn);
    taosLogError("vid:%d, head file:%s corrupted", vnode, pVnode->cfn);
    goto _error_open;
H
hzcheng 已提交
355 356
  }

357 358 359 360 361 362 363
  // Open .data file
  pVnode->dfd = open(dname, O_RDWR);
  if (pVnode->dfd < 0) {
    dError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno));
    taosLogError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno));
    goto _error_open;
  }
H
hzcheng 已提交
364

365 366 367 368 369 370
  fstat(pVnode->dfd, &filestat);
  if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
    dError("vid:%d, data file:%s corrupted", vnode, dname);
    taosLogError("vid:%d, data file:%s corrupted", vnode, dname);
    goto _error_open;
  }
H
hzcheng 已提交
371

372 373 374 375 376 377 378
  // Open .last file
  pVnode->lfd = open(pVnode->lfn, O_RDWR);
  if (pVnode->lfd < 0) {
    dError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
    taosLogError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
    goto _error_open;
  }
H
hzcheng 已提交
379

380 381 382 383 384
  fstat(pVnode->lfd, &filestat);
  if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
    dError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
    taosLogError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
    goto _error_open;
H
hzcheng 已提交
385 386
  }

387
  return 0;
H
hzcheng 已提交
388

389 390 391
_error_open:
  if (pVnode->hfd > 0) close(pVnode->hfd);
  pVnode->hfd = 0;
H
hzcheng 已提交
392

393 394
  if (pVnode->dfd > 0) close(pVnode->dfd);
  pVnode->dfd = 0;
H
hzcheng 已提交
395

396 397
  if (pVnode->lfd > 0) close(pVnode->lfd);
  pVnode->lfd = 0;
H
hzcheng 已提交
398

399 400
  return -1;
}
H
hzcheng 已提交
401

402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
/* Function to open .t file and sendfile the first part
 */
int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid) {
  char        dHeadName[TSDB_FILENAME_LEN] = "\0";
  SVnodeObj * pVnode = vnodeList + pObj->vnode;
  struct stat filestat;
  int         sid;

  // cfn: .head
  if (readlink(pVnode->cfn, dHeadName, TSDB_FILENAME_LEN) < 0) return -1;

  size_t len = strlen(dHeadName);
  // switch head name
  switch (dHeadName[len - 1]) {
    case '0':
      dHeadName[len - 1] = '1';
      break;
    case '1':
      dHeadName[len - 1] = '0';
      break;
    default:
      dError("vid: %d, fid: %d, head target filename not end with 0 or 1", pVnode->vnode, fid);
      return -1;
H
hzcheng 已提交
425 426
  }

427
  vnodeGetHeadTname(pVnode->nfn, NULL, pVnode->vnode, fid);
H
Hongze Cheng 已提交
428
  if (symlink(dHeadName, pVnode->nfn) < 0) return -1;
H
hzcheng 已提交
429

430 431 432 433 434
  pVnode->nfd = open(pVnode->nfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
  if (pVnode->nfd < 0) {
    dError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
    taosLogError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
    return -1;
S
slguan 已提交
435 436
  }

437 438
  fstat(pVnode->hfd, &filestat);
  pHandle->hfSize = filestat.st_size;
S
slguan 已提交
439

440 441 442
  // Find the next sid whose compInfoOffset > 0
  for (sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; sid++) {
    if (pHandle->pHeader[sid].compInfoOffset > 0) break;
H
hzcheng 已提交
443
  }
S
slguan 已提交
444

445
  pHandle->nextNo0Offset = (sid == pVnode->cfg.maxSessions) ? pHandle->hfSize : pHandle->pHeader[sid].compInfoOffset;
S
slguan 已提交
446

447 448 449 450 451
  // FIXME: sendfile the original part
  // TODO: Here, we need to take the deleted table case in consideration, this function
  // just assume the case is handled before calling this function
  if (pHandle->pHeader[pObj->sid].compInfoOffset > 0) {
    pHandle->compInfoOffset = pHandle->pHeader[pObj->sid].compInfoOffset;
H
hzcheng 已提交
452
  } else {
453
    pHandle->compInfoOffset = pHandle->nextNo0Offset;
H
hzcheng 已提交
454 455
  }

456
  assert(pHandle->compInfoOffset <= pHandle->hfSize);
H
hzcheng 已提交
457

458 459 460
  lseek(pVnode->hfd, 0, SEEK_SET);
  lseek(pVnode->nfd, 0, SEEK_SET);
  if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->compInfoOffset) < 0) {
H
Hongze Cheng 已提交
461
    return -1;
H
hzcheng 已提交
462 463
  }

464 465
  // Leave a SCompInfo space here
  lseek(pVnode->nfd, sizeof(SCompInfo), SEEK_CUR);
H
hzcheng 已提交
466

467
  return 0;
H
hzcheng 已提交
468 469
}

470
typedef enum { DATA_LOAD_TIMESTAMP = 0x1, DATA_LOAD_OTHER_DATA = 0x2 } DataLoadMod;
H
hzcheng 已提交
471

472 473
/* Function to load a block data at the requirement of mod
 */
H
Hongze Cheng 已提交
474
static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod, int *code) {
475 476
  size_t      size;
  SCompBlock *pBlock = pHandle->pBlocks + blockId;
H
Hongze Cheng 已提交
477
  *code = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
478

479
  SVnodeObj *pVnode = vnodeList + pObj->vnode;
H
hzcheng 已提交
480

481
  int dfd = pBlock->last ? pVnode->lfd : pVnode->dfd;
H
hzcheng 已提交
482

483 484 485
  if (pHandle->blockId != blockId) {
    pHandle->blockId = blockId;
    pHandle->blockLoadState = 0;
H
hzcheng 已提交
486 487
  }

488 489 490 491 492 493 494
  if (pHandle->blockLoadState == 0){ // Reload pField
    size = sizeof(SField) * pBlock->numOfCols + sizeof(TSCKSUM);
    if (pHandle->pFieldSize < size) {
      pHandle->pField = (SField *)realloc((void *)(pHandle->pField), size);
      if (pHandle->pField == NULL) {
        dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
               pObj->meterId, size);
H
Hongze Cheng 已提交
495
        *code = TSDB_CODE_SERV_OUT_OF_MEMORY;
496
        return -1;
H
hzcheng 已提交
497
      }
498
      pHandle->pFieldSize = size;
H
hzcheng 已提交
499 500
    }

501 502
    lseek(dfd, pBlock->offset, SEEK_SET);
    if (read(dfd, (void *)(pHandle->pField), pHandle->pFieldSize) < 0) {
L
lihui 已提交
503
      dError("vid:%d sid:%d meterId:%s, failed to read data file, size:%zu reason:%s", pVnode->vnode, pObj->sid,
504
             pObj->meterId, pHandle->pFieldSize, strerror(errno));
H
Hongze Cheng 已提交
505
      *code = TSDB_CODE_FILE_CORRUPTED;
506 507
      return -1;
    }
H
hzcheng 已提交
508

509 510 511
    if (!taosCheckChecksumWhole((uint8_t *)(pHandle->pField), pHandle->pFieldSize)) {
      dError("vid:%d sid:%d meterId:%s, data file %s is broken since checksum mismatch", pVnode->vnode, pObj->sid,
             pObj->meterId, pVnode->lfn);
H
Hongze Cheng 已提交
512
      *code = TSDB_CODE_FILE_CORRUPTED;
513
      return -1;
H
hzcheng 已提交
514
    }
515
  }
H
hzcheng 已提交
516

517
  {  // Allocate necessary buffer
H
Hongze Cheng 已提交
518 519
    size = pObj->bytesPerPoint * pObj->pointsPerFileBlock +
           (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns;
520 521 522 523 524
    if (pHandle->buffer == NULL) {
      pHandle->buffer = malloc(size);
      if (pHandle->buffer == NULL) {
        dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
               pObj->meterId, size);
H
Hongze Cheng 已提交
525
        *code = TSDB_CODE_SERV_OUT_OF_MEMORY;
526 527
        return -1;
      }
H
hzcheng 已提交
528

529 530 531 532
      // TODO: Init data
      pHandle->data[0] = (SData *)(pHandle->buffer);
      for (int col = 1; col < pObj->numOfColumns; col++) {
        pHandle->data[col] = (SData *)((char *)(pHandle->data[col - 1]) + sizeof(SData) + EXTRA_BYTES +
H
Hongze Cheng 已提交
533
                                       sizeof(TSCKSUM) + pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
H
hzcheng 已提交
534 535 536
      }
    }

537 538 539 540 541
    if (pHandle->temp == NULL) {
      pHandle->temp = malloc(size);
      if (pHandle->temp == NULL) {
        dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
               pObj->meterId, size);
H
Hongze Cheng 已提交
542
        *code = TSDB_CODE_SERV_OUT_OF_MEMORY;
543
        return -1;
H
hzcheng 已提交
544 545 546
      }
    }

547
    if (pHandle->tempBuffer == NULL) {
H
Hongze Cheng 已提交
548
      pHandle->tempBufferSize = pObj->maxBytes * pObj->pointsPerFileBlock + EXTRA_BYTES + sizeof(TSCKSUM);
549 550 551 552
      pHandle->tempBuffer = malloc(pHandle->tempBufferSize);
      if (pHandle->tempBuffer == NULL) {
        dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
               pObj->meterId, pHandle->tempBufferSize);
H
Hongze Cheng 已提交
553
        *code = TSDB_CODE_SERV_OUT_OF_MEMORY;
554
        return -1;
H
hzcheng 已提交
555 556 557 558
      }
    }
  }

559 560
  if ((loadMod & DATA_LOAD_TIMESTAMP) &&
      (~(pHandle->blockLoadState & DATA_LOAD_TIMESTAMP))) {  // load only timestamp part
H
Hongze Cheng 已提交
561
    if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX,
562
                             pHandle->data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY) * pBlock->numOfPoints,
H
Hongze Cheng 已提交
563 564 565 566
                             pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize) < 0) {
      *code = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
H
hzcheng 已提交
567

568 569
    pHandle->blockLoadState |= DATA_LOAD_TIMESTAMP;
  }
H
hzcheng 已提交
570

571 572
  if ((loadMod & DATA_LOAD_OTHER_DATA) && (~(pHandle->blockLoadState & DATA_LOAD_OTHER_DATA))) {  // load other columns
    for (int col = 1; col < pBlock->numOfCols; col++) {
H
Hongze Cheng 已提交
573 574 575 576 577 578
      if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data,
                               pBlock->numOfPoints * pObj->schema[col].bytes, pHandle->temp, pHandle->tempBuffer,
                               pHandle->tempBufferSize) < 0) {
        *code = TSDB_CODE_FILE_CORRUPTED;
        return -1;
      }
579
    }
H
hzcheng 已提交
580

581
    pHandle->blockLoadState |= DATA_LOAD_OTHER_DATA;
H
hzcheng 已提交
582 583
  }

584
  return 0;
H
hzcheng 已提交
585 586
}

587 588 589 590
static int vnodeCloseImportFiles(SMeterObj *pObj, SImportHandle *pHandle) {
  SVnodeObj *pVnode = vnodeList + pObj->vnode;
  char       dpath[TSDB_FILENAME_LEN] = "\0";
  SCompInfo  compInfo;
S
#1022  
slguan 已提交
591 592

#ifdef _ALPINE
F
Frozen 已提交
593
  off_t    offset = 0;
S
#1022  
slguan 已提交
594 595 596
#else
  __off_t  offset = 0;
#endif
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613

  if (pVnode->nfd > 0) {
    offset = lseek(pVnode->nfd, 0, SEEK_CUR);
    assert(offset == pHandle->nextNo0Offset + pHandle->driftOffset);

    {  // Write the SCompInfo part
      compInfo.uid = pObj->uid;
      compInfo.last = pHandle->last;
      compInfo.numOfBlocks = pHandle->newNumOfBlocks + pHandle->oldNumOfBlocks;
      compInfo.delimiter = TSDB_VNODE_DELIMITER;
      taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo));

      lseek(pVnode->nfd, pHandle->compInfoOffset, SEEK_SET);
      if (twrite(pVnode->nfd, (void *)(&compInfo), sizeof(SCompInfo)) < 0) {
        dError("vid:%d sid:%d meterId:%s, failed to wirte SCompInfo, reason:%s", pObj->vnode, pObj->sid, pObj->meterId,
               strerror(errno));
        return -1;
H
hzcheng 已提交
614
      }
615
    }
H
hzcheng 已提交
616

617 618 619 620 621
    // Write the rest of the SCompBlock part
    if (pHandle->hfSize > pHandle->nextNo0Offset) {
      lseek(pVnode->nfd, 0, SEEK_END);
      lseek(pVnode->hfd, pHandle->nextNo0Offset, SEEK_SET);
      if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->hfSize - pHandle->nextNo0Offset) < 0) {
L
lihui 已提交
622
        dError("vid:%d sid:%d meterId:%s, failed to sendfile, size:%" PRId64 ", reason:%s", pObj->vnode, pObj->sid,
623 624
               pObj->meterId, pHandle->hfSize - pHandle->nextNo0Offset, strerror(errno));
        return -1;
H
hzcheng 已提交
625
      }
626
    }
H
hzcheng 已提交
627

628 629 630 631 632
    // Write SCompHeader part
    pHandle->pHeader[pObj->sid].compInfoOffset = pHandle->compInfoOffset;
    for (int sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; ++sid) {
      if (pHandle->pHeader[sid].compInfoOffset > 0) {
        pHandle->pHeader[sid].compInfoOffset += pHandle->driftOffset;
H
hzcheng 已提交
633 634
      }
    }
635 636 637 638

    taosCalcChecksumAppend(0, (uint8_t *)(pHandle->pHeader), pHandle->pHeaderSize);
    lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
    if (twrite(pVnode->nfd, (void *)(pHandle->pHeader), pHandle->pHeaderSize) < 0) {
L
lihui 已提交
639
      dError("vid:%d sid:%d meterId:%s, failed to wirte SCompHeader part, size:%zu, reason:%s", pObj->vnode, pObj->sid,
640 641 642
             pObj->meterId, pHandle->pHeaderSize, strerror(errno));
      return -1;
    }
H
hzcheng 已提交
643 644
  }

645 646 647
  // Close opened files
  close(pVnode->dfd);
  pVnode->dfd = 0;
H
hzcheng 已提交
648

649 650
  close(pVnode->hfd);
  pVnode->hfd = 0;
H
hzcheng 已提交
651

652 653
  close(pVnode->lfd);
  pVnode->lfd = 0;
H
hzcheng 已提交
654

655 656 657 658 659 660 661
  if (pVnode->nfd > 0) {
    close(pVnode->nfd);
    pVnode->nfd = 0;

    readlink(pVnode->cfn, dpath, TSDB_FILENAME_LEN);
    rename(pVnode->nfn, pVnode->cfn);
    remove(dpath);
H
hzcheng 已提交
662 663
  }

664
  return 0;
H
hzcheng 已提交
665 666
}

H
Hongze Cheng 已提交
667
static void vnodeConvertRowsToCols(SMeterObj *pObj, const char *payload, int rows, SData *data[], int rowOffset) {
668 669
  int sdataRow;
  int offset;
H
hzcheng 已提交
670

671 672 673 674 675 676
  for (int row = 0; row < rows; ++row) {
    sdataRow = row + rowOffset;
    offset = 0;
    for (int col = 0; col < pObj->numOfColumns; ++col) {
      memcpy(data[col]->data + sdataRow * pObj->schema[col].bytes, payload + pObj->bytesPerPoint * row + offset,
             pObj->schema[col].bytes);
H
hzcheng 已提交
677

678
      offset += pObj->schema[col].bytes;
H
hzcheng 已提交
679 680
    }
  }
681
}
H
hzcheng 已提交
682

683 684 685 686 687 688 689 690 691 692 693 694
static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int rows, int fid) {
  SMeterObj *   pObj = (SMeterObj *)(pImport->pObj);
  SVnodeObj *   pVnode = vnodeList + pObj->vnode;
  SImportHandle importHandle;
  size_t        size = 0;
  SData *       data[TSDB_MAX_COLUMNS];
  char *        buffer = NULL;
  SData *       cdata[TSDB_MAX_COLUMNS];
  char *        cbuffer = NULL;
  SCompBlock    compBlock;
  TSCKSUM       checksum = 0;
  int           pointsImported = 0;
H
Hongze Cheng 已提交
695
  int           code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
696 697 698
  SCachePool *  pPool = (SCachePool *)pVnode->pCachePool;
  SCacheInfo *  pInfo = (SCacheInfo *)(pObj->pCache);
  TSKEY         lastKeyImported = 0;
699

S
slguan 已提交
700
  TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[(uint8_t)pVnode->cfg.precision];
701 702 703 704 705 706 707 708 709
  TSKEY minFileKey = fid * delta;
  TSKEY maxFileKey = minFileKey + delta - 1;
  TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
  TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);

  assert(firstKey >= minFileKey && firstKey <= maxFileKey && lastKey >= minFileKey && lastKey <= maxFileKey);

  // create neccessary files
  pVnode->commitFirstKey = firstKey;
H
Hongze Cheng 已提交
710
  if (vnodeCreateNeccessaryFiles(pVnode) < 0) return TSDB_CODE_OTHERS;
711 712 713 714

  assert(pVnode->commitFileId == fid);

  // Open least files to import .head(hfd) .data(dfd) .last(lfd)
H
Hongze Cheng 已提交
715
  if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return TSDB_CODE_FILE_CORRUPTED;
716 717 718 719 720 721 722 723 724

  memset(&importHandle, 0, sizeof(SImportHandle));

  {  // Load SCompHeader part from .head file
    importHandle.pHeaderSize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
    importHandle.pHeader = (SCompHeader *)malloc(importHandle.pHeaderSize);
    if (importHandle.pHeader == NULL) {
      dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
             pObj->meterId, importHandle.pHeaderSize);
H
Hongze Cheng 已提交
725
      code = TSDB_CODE_SERV_OUT_OF_MEMORY;
726 727
      goto _error_merge;
    }
H
hzcheng 已提交
728

729 730 731 732
    lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
    if (read(pVnode->hfd, (void *)(importHandle.pHeader), importHandle.pHeaderSize) < importHandle.pHeaderSize) {
      dError("vid: %d, sid: %d, meterId: %s, fid: %d failed to read SCompHeader part, reason:%s", pObj->vnode,
             pObj->sid, pObj->meterId, fid, strerror(errno));
H
Hongze Cheng 已提交
733
      code = TSDB_CODE_FILE_CORRUPTED;
734 735
      goto _error_merge;
    }
H
hzcheng 已提交
736

737 738 739
    if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pHeader), importHandle.pHeaderSize)) {
      dError("vid: %d, sid: %d, meterId: %s, fid: %d SCompHeader part is broken", pObj->vnode, pObj->sid, pObj->meterId,
             fid);
H
Hongze Cheng 已提交
740
      code = TSDB_CODE_FILE_CORRUPTED;
741 742
      goto _error_merge;
    }
H
hzcheng 已提交
743 744
  }

745
  {  // Initialize data[] and cdata[], which is used to hold data to write to data file
H
Hongze Cheng 已提交
746
    size = pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns;
747 748 749 750 751

    buffer = (char *)malloc(size);
    if (buffer == NULL) {
      dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
             pObj->meterId, size);
H
Hongze Cheng 已提交
752
      code = TSDB_CODE_SERV_OUT_OF_MEMORY;
753
      goto _error_merge;
H
hzcheng 已提交
754 755
    }

756 757 758 759
    cbuffer = (char *)malloc(size);
    if (cbuffer == NULL) {
      dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
             pObj->meterId, size);
H
Hongze Cheng 已提交
760
      code = TSDB_CODE_SERV_OUT_OF_MEMORY;
761
      goto _error_merge;
H
hzcheng 已提交
762 763
    }

764 765
    data[0] = (SData *)buffer;
    cdata[0] = (SData *)cbuffer;
H
hzcheng 已提交
766

767
    for (int col = 1; col < pObj->numOfColumns; col++) {
H
Hongze Cheng 已提交
768
      data[col] = (SData *)((char *)data[col - 1] + sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM) +
769
                            pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
H
Hongze Cheng 已提交
770
      cdata[col] = (SData *)((char *)cdata[col - 1] + sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM) +
771 772
                             pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
    }
H
hzcheng 已提交
773 774
  }

775 776 777
  if (importHandle.pHeader[pObj->sid].compInfoOffset == 0) {  // No data in this file, just write it
  _write_empty_point:
    if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
H
Hongze Cheng 已提交
778
      code = TSDB_CODE_OTHERS;
779
      goto _error_merge;
H
hzcheng 已提交
780
    }
781 782
    importHandle.oldNumOfBlocks = 0;
    importHandle.driftOffset += sizeof(SCompInfo);
H
Hongze Cheng 已提交
783
    lastKeyImported = lastKey;
784 785 786 787 788 789 790 791 792

    for (int rowsWritten = 0; rowsWritten < rows;) {
      int rowsToWrite = MIN(pVnode->cfg.rowsInFileBlock, (rows - rowsWritten) /* the rows left */);
      vnodeConvertRowsToCols(pObj, payload + rowsWritten * pObj->bytesPerPoint, rowsToWrite, data, 0);
      pointsImported += rowsToWrite;

      compBlock.last = 1;
      if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite) < 0) {
        // TODO: deal with ERROR here
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
793 794
      }

795 796 797 798 799 800 801 802
      importHandle.last = compBlock.last;

      checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
      twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock));
      importHandle.newNumOfBlocks++;
      importHandle.driftOffset += sizeof(SCompBlock);

      rowsWritten += rowsToWrite;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
803
    }
804 805 806 807 808 809 810 811
    twrite(pVnode->nfd, &checksum, sizeof(TSCKSUM));
    importHandle.driftOffset += sizeof(TSCKSUM);
  } else {  // Else if there are old data in this file.
    {       // load SCompInfo and SCompBlock part
      lseek(pVnode->hfd, importHandle.pHeader[pObj->sid].compInfoOffset, SEEK_SET);
      if (read(pVnode->hfd, (void *)(&(importHandle.compInfo)), sizeof(SCompInfo)) < sizeof(SCompInfo)) {
        dError("vid:%d sid:%d meterId:%s, failed to read .head file, reason:%s", pVnode->vnode, pObj->sid,
               pObj->meterId, strerror(errno));
H
Hongze Cheng 已提交
812
        code = TSDB_CODE_FILE_CORRUPTED;
813 814
        goto _error_merge;
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
815

816 817 818 819
      if ((importHandle.compInfo.delimiter != TSDB_VNODE_DELIMITER) ||
          (!taosCheckChecksumWhole((uint8_t *)(&(importHandle.compInfo)), sizeof(SCompInfo)))) {
        dError("vid:%d sid:%d meterId:%s, .head file %s is broken, delemeter:%x", pVnode->vnode, pObj->sid,
               pObj->meterId, pVnode->cfn, importHandle.compInfo.delimiter);
H
Hongze Cheng 已提交
820
        code = TSDB_CODE_FILE_CORRUPTED;
821 822
        goto _error_merge;
      }
H
hzcheng 已提交
823

H
Hongze Cheng 已提交
824 825 826
      // Check the context of SCompInfo part
      if (importHandle.compInfo.uid != pObj->uid) {  // The data belongs to the other meter
        goto _write_empty_point;
827 828 829 830 831 832 833 834 835 836
      }

      importHandle.oldNumOfBlocks = importHandle.compInfo.numOfBlocks;
      importHandle.last = importHandle.compInfo.last;

      size = sizeof(SCompBlock) * importHandle.compInfo.numOfBlocks + sizeof(TSCKSUM);
      importHandle.pBlocks = (SCompBlock *)malloc(size);
      if (importHandle.pBlocks == NULL) {
        dError("vid:%d sid:%d meterId:%s, failed to allocate importHandle.pBlock, size:%ul", pVnode->vnode, pObj->sid,
               pObj->meterId, size);
H
Hongze Cheng 已提交
837
        code = TSDB_CODE_SERV_OUT_OF_MEMORY;
838 839 840 841 842 843
        goto _error_merge;
      }

      if (read(pVnode->hfd, (void *)(importHandle.pBlocks), size) < size) {
        dError("vid:%d sid:%d meterId:%s, failed to read importHandle.pBlock, reason:%s", pVnode->vnode, pObj->sid,
               pObj->meterId, strerror(errno));
H
Hongze Cheng 已提交
844
        code = TSDB_CODE_FILE_CORRUPTED;
845 846 847 848 849 850
        goto _error_merge;
      }

      if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pBlocks), size)) {
        dError("vid:%d sid:%d meterId:%s, pBlock part is broken in %s", pVnode->vnode, pObj->sid, pObj->meterId,
               pVnode->cfn);
H
Hongze Cheng 已提交
851
        code = TSDB_CODE_FILE_CORRUPTED;
852 853
        goto _error_merge;
      }
H
hzcheng 已提交
854 855
    }

856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875
    /* Now we have _payload_, we have _importHandle.pBlocks_, just merge payload into the importHandle.pBlocks
     *
     * Input: payload, pObj->bytesPerBlock, rows, importHandle.pBlocks
     */
    {
      int        payloadIter = 0;
      SBlockIter blockIter = {0, 0, 0, 0};

      while (1) {
        if (payloadIter >= rows) {  // payload end, break
          // write the remaining blocks to the file
          if (pVnode->nfd > 0) {
            int blocksLeft = importHandle.compInfo.numOfBlocks - blockIter.oslot;
            if (blocksLeft > 0) {
              checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot),
                                          sizeof(SCompBlock) * blocksLeft);
              if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot),
                         sizeof(SCompBlock) * blocksLeft) < 0) {
                dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
                       pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * blocksLeft, strerror(errno));
H
Hongze Cheng 已提交
876
                code = TSDB_CODE_OTHERS;
877 878 879 880 881 882 883
                goto _error_merge;
              }
            }

            if (twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM)) < 0) {
              dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
                     pObj->meterId, pVnode->nfn, sizeof(TSCKSUM), strerror(errno));
H
Hongze Cheng 已提交
884
              code = TSDB_CODE_OTHERS;
885 886 887 888 889
              goto _error_merge;
            }
          }
          break;
        }
H
hzcheng 已提交
890

891 892
        if (blockIter.slot >= importHandle.compInfo.numOfBlocks) {  // blocks end, break
          // Should never come here
H
Hongze Cheng 已提交
893
          assert(false);
894
        }
H
hzcheng 已提交
895

896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922
        TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);

        {  // Binary search the (slot, pos) which is >= key as well as nextKey
          int   left = blockIter.slot;
          int   right = importHandle.compInfo.numOfBlocks - 1;
          TSKEY minKey = importHandle.pBlocks[left].keyFirst;
          TSKEY maxKey = importHandle.pBlocks[right].keyLast;

          assert(minKey <= maxKey);

          if (key < minKey) {  // Case 1. write just ahead the blockIter.slot
            blockIter.slot = left;
            blockIter.pos = 0;
            blockIter.nextKey = minKey;
          } else if (key > maxKey) {                 // Case 2. write to the end
            if (importHandle.pBlocks[right].last) {  // Case 2.1 last block in .last file, need to merge
              assert(importHandle.last != 0);
              importHandle.last = 0;
              blockIter.slot = right;
              blockIter.pos = importHandle.pBlocks[right].numOfPoints;
            } else {  // Case 2.2 just write after the last block
              blockIter.slot = right + 1;
              blockIter.pos = 0;
            }
            blockIter.nextKey = maxFileKey + 1;
          } else {  // Case 3. need to search the block for slot and pos
            if (key == minKey || key == maxKey) {
H
Hongze Cheng 已提交
923
              if (tsAffectedRowsMod) pointsImported++;
924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951
              payloadIter++;
              continue;
            }

            // Here: minKey < key < maxKey

            int   mid;
            TSKEY blockMinKey;
            TSKEY blockMaxKey;

            // Binary search the slot
            do {
              mid = (left + right) / 2;
              blockMinKey = importHandle.pBlocks[mid].keyFirst;
              blockMaxKey = importHandle.pBlocks[mid].keyLast;

              assert(blockMinKey <= blockMaxKey);

              if (key < blockMinKey) {
                right = mid;
              } else if (key > blockMaxKey) {
                left = mid + 1;
              } else { /* blockMinKey <= key <= blockMaxKey */
                break;
              }
            } while (left < right);

            if (key == blockMinKey || key == blockMaxKey) {  // duplicate key
H
Hongze Cheng 已提交
952
              if (tsAffectedRowsMod) pointsImported++;
953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968
              payloadIter++;
              continue;
            }

            // Get the slot
            if (key > blockMaxKey) { /* pos = 0 or pos = ? */
              blockIter.slot = mid + 1;
            } else { /* key < blockMinKey (pos = 0) || (key > blockMinKey && key < blockMaxKey) (pos=?) */
              blockIter.slot = mid;
            }

            // Get the pos
            assert(blockIter.slot < importHandle.compInfo.numOfBlocks);

            if (key == importHandle.pBlocks[blockIter.slot].keyFirst ||
                key == importHandle.pBlocks[blockIter.slot].keyLast) {
H
Hongze Cheng 已提交
969
              if (tsAffectedRowsMod) pointsImported++;
970 971 972 973 974 975 976 977 978 979 980 981 982 983
              payloadIter++;
              continue;
            }

            assert(key < importHandle.pBlocks[blockIter.slot].keyLast);

            /* */
            if (key < importHandle.pBlocks[blockIter.slot].keyFirst) {
              blockIter.pos = 0;
              blockIter.nextKey = importHandle.pBlocks[blockIter.slot].keyFirst;
            } else {
              SCompBlock *pBlock = importHandle.pBlocks + blockIter.slot;
              if (pBlock->sversion != pObj->sversion) { /*TODO*/
              }
H
Hongze Cheng 已提交
984 985
              if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP, &code) < 0) {
                goto _error_merge;
986 987 988 989 990
              }
              int pos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])(
                  importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC);
              assert(pos != 0);
              if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) {
H
Hongze Cheng 已提交
991
                if (tsAffectedRowsMod) pointsImported++;
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
                payloadIter++;
                continue;
              }

              blockIter.pos = pos;
              blockIter.nextKey = (blockIter.slot + 1 < importHandle.compInfo.numOfBlocks)
                                      ? importHandle.pBlocks[blockIter.slot + 1].keyFirst
                                      : maxFileKey + 1;
              // Need to merge with this block
              if (importHandle.pBlocks[blockIter.slot].last) {  // this is to merge with the last block
                assert((blockIter.slot == (importHandle.compInfo.numOfBlocks - 1)));
                importHandle.last = 0;
              }
            }
          }
        }
H
hzcheng 已提交
1008

H
Hongze Cheng 已提交
1009 1010 1011 1012 1013 1014 1015
        int aslot = MIN(blockIter.slot, importHandle.compInfo.numOfBlocks - 1);
        int64_t sversion = importHandle.pBlocks[aslot].sversion;
        if (sversion != pObj->sversion) {
          code = TSDB_CODE_OTHERS;
          goto _error_merge;
        }

1016 1017 1018
        // Open the new .t file if not opened yet.
        if (pVnode->nfd <= 0) {
          if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
H
Hongze Cheng 已提交
1019
            code = TSDB_CODE_OTHERS;
1020
            goto _error_merge;
H
hzcheng 已提交
1021 1022 1023
          }
        }

1024 1025 1026 1027 1028 1029 1030 1031
        if (blockIter.slot > blockIter.oslot) {  // write blocks in range [blockIter.oslot, blockIter.slot) to .t file
          checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot),
                                      sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot));
          if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot),
                     sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot)) < 0) {
            dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
                   pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot),
                   strerror(errno));
H
Hongze Cheng 已提交
1032
            code = TSDB_CODE_OTHERS;
1033
            goto _error_merge;
H
hzcheng 已提交
1034
          }
1035 1036

          blockIter.oslot = blockIter.slot;
H
hzcheng 已提交
1037 1038
        }

1039 1040 1041 1042 1043
        if (blockIter.pos == 0) {  // No need to merge
          // copy payload part to data
          int rowOffset = 0;
          for (; payloadIter < rows; rowOffset++) {
            if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) break;
H
hzcheng 已提交
1044

1045 1046
            vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
            pointsImported++;
H
Hongze Cheng 已提交
1047
            lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
1048 1049
            payloadIter++;
          }
H
hzcheng 已提交
1050

1051 1052 1053 1054 1055
          // write directly to .data file
          compBlock.last = 0;
          if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
            // TODO: Deal with the ERROR here
          }
H
hzcheng 已提交
1056

1057 1058 1059 1060 1061 1062 1063
          checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
          if (twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)) < 0) {
            // TODO : deal with the ERROR here
          }
          importHandle.newNumOfBlocks++;
          importHandle.driftOffset += sizeof(SCompBlock);
        } else {  // Merge block and payload from payloadIter
H
hzcheng 已提交
1064

1065
          if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot,
H
Hongze Cheng 已提交
1066
                                       DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA, &code) < 0) {  // Load neccessary blocks
1067 1068
            goto _error_merge;
          }
H
hzcheng 已提交
1069

1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
          importHandle.oldNumOfBlocks--;
          importHandle.driftOffset -= sizeof(SCompBlock);

          int rowOffset = blockIter.pos;  // counter for data

          // Copy the front part
          for (int col = 0; col < pObj->numOfColumns; col++) {
            memcpy((void *)(data[col]->data), (void *)(importHandle.data[col]->data),
                   pObj->schema[col].bytes * blockIter.pos);
          }

          // Merge part
          while (1) {
            if (rowOffset >= pVnode->cfg.rowsInFileBlock) {  // data full in a block to commit
              compBlock.last = 0;
              if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
                // TODO : deal with the ERROR here
              }

              checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
              if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) {
                dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
                       pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno));
                goto _error_merge;
              }
              importHandle.newNumOfBlocks++;
              importHandle.driftOffset += sizeof(SCompBlock);
              rowOffset = 0;
            }

            if ((payloadIter >= rows || KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) &&
                blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints)
              break;

            if (payloadIter >= rows ||
                KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) {  // payload end
              for (int col = 0; col < pObj->numOfColumns; col++) {
                memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes,
                       importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos, pObj->schema[col].bytes);
              }
              blockIter.pos++;
              rowOffset++;
            } else if (blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints) {  // block end
              vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
              pointsImported++;
H
Hongze Cheng 已提交
1115
              lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
1116 1117 1118 1119 1120 1121
              payloadIter++;
              rowOffset++;
            } else {
              if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) ==
                  KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
                               blockIter.pos)) {  // duplicate key
H
Hongze Cheng 已提交
1122
                if (tsAffectedRowsMod) pointsImported++;
1123 1124 1125 1126 1127 1128 1129
                payloadIter++;
                continue;
              } else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
                         KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
                                      blockIter.pos)) {
                vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
                pointsImported++;
H
Hongze Cheng 已提交
1130
                lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
                payloadIter++;
                rowOffset++;
              } else {
                for (int col = 0; col < pObj->numOfColumns; col++) {
                  memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes,
                         importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos,
                         pObj->schema[col].bytes);
                }
                blockIter.pos++;
                rowOffset++;
              }
            }
          }
          if (rowOffset > 0) {  // data full in a block to commit
            compBlock.last = 0;
            if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
              // TODO : deal with the ERROR here
            }

            checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
            if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) {
              dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
                     pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno));
              goto _error_merge;
            }
            importHandle.newNumOfBlocks++;
            importHandle.driftOffset += sizeof(SCompBlock);
            rowOffset = 0;
          }

          blockIter.slot++;
          blockIter.oslot = blockIter.slot;
H
hzcheng 已提交
1163 1164 1165 1166 1167
        }
      }
    }
  }

1168 1169
  // Write the SCompInfo part
  if (vnodeCloseImportFiles(pObj, &importHandle) < 0) {
H
Hongze Cheng 已提交
1170
    code = TSDB_CODE_OTHERS;
1171
    goto _error_merge;
S
slguan 已提交
1172
  }
H
hzcheng 已提交
1173

1174
  pImport->importedRows += pointsImported;
H
hzcheng 已提交
1175

H
Hongze Cheng 已提交
1176 1177 1178 1179
  pthread_mutex_lock(&(pPool->vmutex));
  if (pInfo->numOfBlocks > 0) {
    int   slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
    TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0]));
H
hzcheng 已提交
1180

H
Hongze Cheng 已提交
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
    // data may be in commited cache, cache shall be released
    if (lastKeyImported > firstKeyInCache) {
      while (slot != pInfo->commitSlot) {
        SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
        vnodeFreeCacheBlock(pCacheBlock);
        slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
      }

      if (pInfo->commitPoint == pObj->pointsPerBlock) {
        if (pInfo->cacheBlocks[pInfo->commitSlot]->pMeterObj == pObj) {
          vnodeFreeCacheBlock(pInfo->cacheBlocks[pInfo->commitSlot]);
        }
      }
    }
H
hzcheng 已提交
1195
  }
H
Hongze Cheng 已提交
1196 1197
  pthread_mutex_unlock(&(pPool->vmutex));

1198 1199 1200 1201 1202 1203 1204 1205 1206
  // TODO: free the allocated memory
  tfree(buffer);
  tfree(cbuffer);
  tfree(importHandle.pHeader);
  tfree(importHandle.pBlocks);
  tfree(importHandle.pField);
  tfree(importHandle.buffer);
  tfree(importHandle.temp);
  tfree(importHandle.tempBuffer);
H
hzcheng 已提交
1207 1208 1209

  return code;

1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
_error_merge:
  tfree(buffer);
  tfree(cbuffer);
  tfree(importHandle.pHeader);
  tfree(importHandle.pBlocks);
  tfree(importHandle.pField);
  tfree(importHandle.buffer);
  tfree(importHandle.temp);
  tfree(importHandle.tempBuffer);

  close(pVnode->dfd);
  pVnode->dfd = 0;

  close(pVnode->hfd);
  pVnode->hfd = 0;

  close(pVnode->lfd);
  pVnode->lfd = 0;

  if (pVnode->nfd > 0) {
    close(pVnode->nfd);
    pVnode->nfd = 0;
    remove(pVnode->nfn);
H
hzcheng 已提交
1233 1234 1235 1236 1237
  }

  return code;
}

1238 1239 1240 1241 1242 1243 1244 1245 1246
#define FORWARD_ITER(iter, step, slotLimit, posLimit) \
  {                                                   \
    if ((iter.pos) + (step) < (posLimit)) {           \
      (iter.pos) = (iter.pos) + (step);               \
    } else {                                          \
      (iter.pos) = 0;                                 \
      (iter.slot) = ((iter.slot) + 1) % (slotLimit);  \
    }                                                 \
  }
H
hzcheng 已提交
1247

1248 1249 1250 1251
int isCacheEnd(SBlockIter iter, SMeterObj *pMeter) {
  SCacheInfo *pInfo = (SCacheInfo *)(pMeter->pCache);
  int         slot = 0;
  int         pos = 0;
H
hzcheng 已提交
1252

1253 1254 1255
  if (pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints == pMeter->pointsPerBlock) {
    slot = (pInfo->currentSlot + 1) % (pInfo->maxBlocks);
    pos = 0;
H
hzcheng 已提交
1256
  } else {
1257 1258
    slot = pInfo->currentSlot;
    pos = pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints;
H
hzcheng 已提交
1259
  }
1260
  return ((iter.slot == slot) && (iter.pos == pos));
H
hzcheng 已提交
1261 1262
}

H
Hongze Cheng 已提交
1263 1264 1265 1266 1267 1268 1269
static void vnodeFlushMergeBuffer(SMergeBuffer *pBuffer, SBlockIter *pWriteIter, SBlockIter *pCacheIter,
                                  SMeterObj *pObj, SCacheInfo *pInfo, int checkBound) {
  // Function to flush the merge buffer data to cache
  if (pWriteIter->pos == pObj->pointsPerBlock) {
    pWriteIter->pos = 0;
    pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks;
  }
H
hzcheng 已提交
1270

H
Hongze Cheng 已提交
1271 1272 1273 1274 1275 1276
  while (pBuffer->spos != pBuffer->epos) {
    if (checkBound && pWriteIter->slot == pCacheIter->slot && pWriteIter->pos == pCacheIter->pos) break;
    for (int col = 0; col < pObj->numOfColumns; col++) {
      memcpy(pInfo->cacheBlocks[pWriteIter->slot]->offset[col] + pObj->schema[col].bytes * pWriteIter->pos,
             pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes);
    }
H
hzcheng 已提交
1277

H
Hongze Cheng 已提交
1278 1279 1280 1281 1282 1283
    if (pWriteIter->pos + 1 < pObj->pointsPerBlock) {
      (pWriteIter->pos)++;
    } else {
      pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos + 1;
      pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks;
      pWriteIter->pos = 0;
H
hzcheng 已提交
1284
    }
H
Hongze Cheng 已提交
1285 1286

    pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows;
H
hzcheng 已提交
1287 1288
  }

H
Hongze Cheng 已提交
1289 1290 1291
  if ((!checkBound) && pWriteIter->pos != 0) {
    pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos;
  }
H
hzcheng 已提交
1292 1293
}

1294 1295 1296 1297 1298 1299
int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int rows) {
  SMeterObj *   pObj = pImport->pObj;
  SVnodeObj *   pVnode = vnodeList + pObj->vnode;
  int           code = -1;
  SCacheInfo *  pInfo = (SCacheInfo *)(pObj->pCache);
  int           payloadIter;
H
Hongze Cheng 已提交
1300
  SCachePool *  pPool = (SCachePool *)(pVnode->pCachePool);
1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321
  int           isCacheIterEnd = 0;
  int           spayloadIter = 0;
  int           isAppendData = 0;
  int           rowsImported = 0;
  int           totalRows = 0;
  size_t        size = 0;
  SMergeBuffer *pBuffer = NULL;

  TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
  TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);

  assert(firstKey <= lastKey && firstKey > pObj->lastKeyOnFile);

  // TODO: make this condition less strict
  if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) {  // No free room to hold the data
    dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId,
           pObj->freePoints);
    pImport->importedRows = 0;
    pImport->commit = 1;
    code = TSDB_CODE_ACTION_IN_PROGRESS;
    return code;
H
hzcheng 已提交
1322 1323
  }

1324 1325
  if (pInfo->numOfBlocks == 0) {
    if (vnodeAllocateCacheBlock(pObj) < 0) {
H
Hongze Cheng 已提交
1326 1327 1328 1329
      pImport->importedRows = 0;
      pImport->commit = 1;
      code = TSDB_CODE_ACTION_IN_PROGRESS;
      return code;
1330
    }
H
hzcheng 已提交
1331 1332
  }

1333 1334 1335 1336
  // Find the first importable record from payload
  pImport->lastKey = lastKey;
  for (payloadIter = 0; payloadIter < rows; payloadIter++) {
    TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
H
Hongze Cheng 已提交
1337 1338 1339 1340
    if (key == pObj->lastKey) {
      if (tsAffectedRowsMod) rowsImported++;
      continue;
    }
1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
    if (key > pObj->lastKey) {  // Just as insert
      pImport->slot = pInfo->currentSlot;
      pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints;
      isCacheIterEnd = 1;
      break;
    } else {
      pImport->firstKey = key;
      if (vnodeFindKeyInCache(pImport, 1) < 0) {
        goto _exit;
      }
S
slguan 已提交
1351

1352
      if (pImport->firstKey != pImport->key) break;
H
Hongze Cheng 已提交
1353
      if (tsAffectedRowsMod) rowsImported++;
1354
    }
H
hzcheng 已提交
1355 1356
  }

1357
  if (payloadIter == rows) {
H
Hongze Cheng 已提交
1358
    pImport->importedRows += rowsImported;
1359 1360
    code = 0;
    goto _exit;
S
slguan 已提交
1361 1362
  }

1363 1364
  spayloadIter = payloadIter;
  if (pImport->pos == pObj->pointsPerBlock) assert(isCacheIterEnd);
S
slguan 已提交
1365

1366 1367 1368 1369 1370 1371
  // Allocate a new merge buffer work as buffer
  totalRows = pObj->pointsPerBlock + rows - payloadIter + 1;
  size = sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns + pObj->bytesPerPoint * totalRows;
  pBuffer = (SMergeBuffer *)malloc(size);
  if (pBuffer == NULL) {
    dError("vid:%d sid:%d meterId:%s, failed to allocate memory, size:%d", pObj->vnode, pObj->sid, pObj->meterId, size);
H
Hongze Cheng 已提交
1372
    return TSDB_CODE_SERV_OUT_OF_MEMORY;
1373 1374 1375 1376 1377 1378 1379
  }
  pBuffer->spos = 0;
  pBuffer->epos = 0;
  pBuffer->totalRows = totalRows;
  pBuffer->offset[0] = (char *)pBuffer + sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns;
  for (int col = 1; col < pObj->numOfColumns; col++) {
    pBuffer->offset[col] = pBuffer->offset[col - 1] + pObj->schema[col - 1].bytes * totalRows;
H
hzcheng 已提交
1380 1381
  }

1382 1383 1384 1385 1386
  // TODO: take pImport->pos = pObj->pointsPerBlock into consideration
  {                                                              // Do the merge staff
    SBlockIter cacheIter = {pImport->slot, pImport->pos, 0, 0};  // Iter to traverse old cache data
    SBlockIter writeIter = {pImport->slot, pImport->pos, 0, 0};  // Iter to write data to cache
    int        availPoints = pObj->pointsPerBlock - pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints;
S
slguan 已提交
1387

1388
    assert(availPoints >= 0);
H
hzcheng 已提交
1389

1390 1391
    while (1) {
      if ((payloadIter >= rows) && isCacheIterEnd) break;
S
slguan 已提交
1392

1393
      if ((pBuffer->epos + 1) % pBuffer->totalRows == pBuffer->spos) {  // merge buffer is full, flush
H
Hongze Cheng 已提交
1394
        vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 1);
1395
      }
S
slguan 已提交
1396

H
Hongze Cheng 已提交
1397 1398 1399 1400
      TSKEY payloadKey = (payloadIter < rows) ? KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) : INT64_MAX;
      TSKEY cacheKey = (isCacheIterEnd) ? INT64_MAX : KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), cacheIter.pos);

      if (cacheKey < payloadKey) {  // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache
1401 1402 1403 1404 1405 1406 1407
        for (int col = 0; col < pObj->numOfColumns; col++) {
          memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
                 pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
                 pObj->schema[col].bytes);
        }
        FORWARD_ITER(cacheIter, 1, pInfo->maxBlocks, pObj->pointsPerBlock);
        isCacheIterEnd = isCacheEnd(cacheIter, pObj);
H
Hongze Cheng 已提交
1408
      } else if (cacheKey > payloadKey) {  // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload
1409 1410
        if (availPoints == 0) {                      // Need to allocate a new cache block
          pthread_mutex_lock(&(pPool->vmutex));
H
Hongze Cheng 已提交
1411
          // TODO: Need to check if there are enough slots to hold a new one
1412
          SCacheBlock *pNewBlock = vnodeGetFreeCacheBlock(pVnode);
H
Hongze Cheng 已提交
1413
          if (pNewBlock == NULL) {  // Failed to allocate a new cache block, need to commit and loop over the remaining cache records
1414 1415 1416 1417 1418 1419
            pthread_mutex_unlock(&(pPool->vmutex));
            payloadIter = rows;
            code = TSDB_CODE_ACTION_IN_PROGRESS;
            pImport->commit = 1;
            continue;
          }
H
Hongze Cheng 已提交
1420 1421 1422 1423 1424
          
          assert(pInfo->numOfBlocks <= pInfo->maxBlocks);
          if (pInfo->numOfBlocks == pInfo->maxBlocks) {
            vnodeFreeCacheBlock(pInfo->cacheBlocks[(pInfo->currentSlot + 1) % pInfo->maxBlocks]);
          }
1425 1426 1427 1428 1429 1430

          pNewBlock->pMeterObj = pObj;
          pNewBlock->offset[0] = (char *)pNewBlock + sizeof(SCacheBlock) + sizeof(char *) * pObj->numOfColumns;
          for (int col = 1; col < pObj->numOfColumns; col++)
            pNewBlock->offset[col] = pNewBlock->offset[col - 1] + pObj->schema[col - 1].bytes * pObj->pointsPerBlock;

H
Hui Li 已提交
1431 1432 1433 1434
          int newSlot = writeIter.slot;
          if (newSlot != ((pInfo->currentSlot + 1) % pInfo->maxBlocks)) {
            newSlot = (newSlot + 1) % pInfo->maxBlocks;
          }
1435 1436 1437
          pInfo->blocks++;
          int tblockId = pInfo->blocks;

H
Hui Li 已提交
1438
          if ((writeIter.slot != pInfo->currentSlot) && (writeIter.slot != ((pInfo->currentSlot + 1) % pInfo->maxBlocks))) {
1439 1440 1441 1442 1443 1444 1445 1446 1447 1448
            for (int tslot = pInfo->currentSlot; tslot != writeIter.slot;) {
              int nextSlot = (tslot + 1) % pInfo->maxBlocks;
              pInfo->cacheBlocks[nextSlot] = pInfo->cacheBlocks[tslot];
              pInfo->cacheBlocks[nextSlot]->slot = nextSlot;
              pInfo->cacheBlocks[nextSlot]->blockId = tblockId--;
              tslot = (tslot - 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
            }
          }

          int index = pNewBlock->index;
H
Hui Li 已提交
1449
          if (cacheIter.slot == writeIter.slot && cacheIter.slot != ((pInfo->currentSlot + 1) % pInfo->maxBlocks)) {
1450 1451 1452 1453
            pNewBlock->numOfPoints = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints;
            int pointsLeft = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints - cacheIter.pos;
            if (pointsLeft > 0) {
              for (int col = 0; col < pObj->numOfColumns; col++) {
1454
                memcpy((void *)(pNewBlock->offset[col] + pObj->schema[col].bytes*cacheIter.pos),
1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493
                       pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
                       pObj->schema[col].bytes * pointsLeft);
              }
            }
          }
          pNewBlock->blockId = tblockId;
          pNewBlock->slot = newSlot;
          pNewBlock->index = index;
          pInfo->cacheBlocks[newSlot] = pNewBlock;
          pInfo->numOfBlocks++;
          pInfo->unCommittedBlocks++;
          pInfo->currentSlot = (pInfo->currentSlot + 1) % pInfo->maxBlocks;
          pthread_mutex_unlock(&(pPool->vmutex));
          cacheIter.slot = (cacheIter.slot + 1) % pInfo->maxBlocks;
          // move a cache of data forward
          availPoints = pObj->pointsPerBlock;
        }

        int offset = 0;
        for (int col = 0; col < pObj->numOfColumns; col++) {
          memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
                 payload + pObj->bytesPerPoint * payloadIter + offset, pObj->schema[col].bytes);
          offset += pObj->schema[col].bytes;
        }
        if (spayloadIter == payloadIter) {// update pVnode->firstKey
          pthread_mutex_lock(&(pVnode->vmutex));
          if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < pVnode->firstKey) pVnode->firstKey = firstKey;
          pthread_mutex_unlock(&(pVnode->vmutex));
        }
        if (isCacheIterEnd) {
          pObj->lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
          if (!isAppendData) isAppendData = 1;
        }

        rowsImported++;
        availPoints--;
        payloadIter++;

      } else {
H
Hongze Cheng 已提交
1494
        if (tsAffectedRowsMod) rowsImported++;
1495 1496
        payloadIter++;
        continue;
H
hzcheng 已提交
1497
      }
1498 1499 1500
      pBuffer->epos = (pBuffer->epos + 1) % pBuffer->totalRows;
    }

H
Hongze Cheng 已提交
1501 1502 1503 1504 1505
    if (pBuffer->spos != pBuffer->epos) { // Flush the remaining data in the merge buffer
      vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 0);
    } else {
      // Should never come here
      assert(false);
1506 1507 1508 1509 1510 1511
    }

    if (isAppendData) {
      pthread_mutex_lock(&(pVnode->vmutex));
      if (pObj->lastKey > pVnode->lastKey) pVnode->lastKey = pObj->lastKey;
      pthread_mutex_unlock(&(pVnode->vmutex));
H
hzcheng 已提交
1512 1513
    }
  }
1514
  pImport->importedRows += rowsImported;
H
Hongze Cheng 已提交
1515
  atomic_fetch_sub_32(&(pObj->freePoints), rowsImported);
H
hzcheng 已提交
1516

H
Hongze Cheng 已提交
1517
  code = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1518

1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
_exit:
  tfree(pBuffer);
  return code;
}

int vnodeImportDataToFiles(SImportInfo *pImport, char *payload, const int rows) {
  int code = 0;
  // TODO : Check the correctness of pObj and pVnode
  SMeterObj *pObj = (SMeterObj *)(pImport->pObj);
  SVnodeObj *pVnode = vnodeList + pObj->vnode;
H
hzcheng 已提交
1529

S
slguan 已提交
1530
  int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[(uint8_t)pVnode->cfg.precision];
1531 1532
  int     sfid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0) / delta;
  int     efid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1) / delta;
H
hzcheng 已提交
1533

1534 1535 1536 1537 1538 1539 1540 1541 1542
  for (int fid = sfid; fid <= efid; fid++) {
    TSKEY skey = fid * delta;
    TSKEY ekey = skey + delta - 1;
    int   srow = 0, nrows = 0;

    if (vnodeSearchKeyInRange(payload, pObj->bytesPerPoint, rows, skey, ekey, &srow, &nrows) < 0) continue;

    assert(nrows > 0);

L
lihui 已提交
1543
    dTrace("vid:%d sid:%d meterId:%s, %d rows of data will be imported to file %d, srow:%d firstKey:%" PRId64 " lastKey:%" PRId64,
1544 1545 1546 1547
           pObj->vnode, pObj->sid, pObj->meterId, nrows, fid, srow, KEY_AT_INDEX(payload, pObj->bytesPerPoint, srow),
           KEY_AT_INDEX(payload, pObj->bytesPerPoint, (srow + nrows - 1)));

    code = vnodeMergeDataIntoFile(pImport, payload + (srow * pObj->bytesPerPoint), nrows, fid);
H
Hongze Cheng 已提交
1548
    if (code != TSDB_CODE_SUCCESS) break;
H
hzcheng 已提交
1549 1550
  }

1551
  return code;
H
hzcheng 已提交
1552 1553
}

1554
// TODO : add offset in pShell to make it avoid repeatedly deal with messages
H
hzcheng 已提交
1555
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
1556 1557 1558 1559 1560 1561 1562 1563
  int         code = 0;
  int         srow = 0, nrows = 0;
  SVnodeObj * pVnode = vnodeList + pObj->vnode;
  SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);

  // 1. import data in range (pObj->lastKeyOnFile, INT64_MAX) into cache
  if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, pObj->lastKeyOnFile + 1, INT64_MAX,
                            &srow, &nrows) >= 0) {
H
Hongze Cheng 已提交
1564
    assert(nrows > 0);
1565 1566 1567 1568 1569 1570
    code = vnodeImportDataToCache(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
    if (pImport->commit) {  // Need to commit now
      pPool->commitInProcess = 0;
      vnodeProcessCommitTimer(pVnode, NULL);
      return code;
    }
H
hzcheng 已提交
1571

H
Hongze Cheng 已提交
1572
    if (code != TSDB_CODE_SUCCESS) return code;
H
hzcheng 已提交
1573 1574
  }

1575 1576 1577
  // 2. import data (0, pObj->lastKeyOnFile) into files
  if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, 0, pObj->lastKeyOnFile - 1, &srow,
                            &nrows) >= 0) {
H
Hongze Cheng 已提交
1578
    assert(nrows > 0);
1579 1580
    code = vnodeImportDataToFiles(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
  }
H
hzcheng 已提交
1581

1582
  pPool->commitInProcess = 0;
H
hzcheng 已提交
1583 1584 1585

  return code;
}